ps/Modules/Alkami.PowerShell.Common/Public/Invoke-Parallel2.ps1
2023-05-30 22:51:22 -07:00

342 lines
15 KiB
PowerShell

function Invoke-Parallel2 {
<#
.SYNOPSIS
Executes a script block against a list of objects in parallel with PSJobs.
Returns explicit results with Result/Object/Success/Error properties so the caller can deal with failures as they see fit.
.PARAMETER Objects
The objects to be operated on in parallel.
.PARAMETER Script
The script to be executed in parallel. The first param is implicitly one of the $objects, and the second param is your $Arguments array.
.PARAMETER Arguments
Global arguments to be handed to each job.
.PARAMETER NumThreads
The level of parallelism.
.PARAMETER ThreadPerObject
Creates a PSJob for each individual object, throttled to numThreads threads. Without it objects are divied evenly between a fixed numThread threads.
.PARAMETER StopProcessingJobsOnError
If a job fails, it will not create any more jobs. It will still return any outstanding jobs, but it will not create any more.
.PARAMETER CleanupJobs
Whether to explicitly call Remove-Job when calling Receive-Job as an attempt to manually manage memory
.NOTES
The arguments are passed to each job globally. If you want to pass different arguments to different jobs, format it into the object[] objects argument.
#>
param(
[Parameter(Mandatory = $true)]
[AllowNull()]
[object[]]$Objects,
[Parameter(Mandatory = $true)]
[object]$Script,
[Parameter(Mandatory = $false)]
[object[]]$Arguments = $null,
[Parameter(Mandatory = $false)]
[int]$NumThreads = 8,
[Parameter(Mandatory = $false)]
[switch]$ThreadPerObject,
[Parameter(Mandatory = $false)]
[switch]$StopProcessingJobsOnError,
[Parameter(Mandatory = $false)]
[switch]$CleanupJobs
)
process {
# Return if there are no elements to process.
if (Test-IsCollectionNullOrEmpty $Objects) {
return $null
}
# These are all the states that are "done" as in not doing anything
# I think "Suspended" should only be for Workflows
# I think "Disconnected" should only be for PSRemoting sessions
# those will come into play for Invoke-ParallelServers
# and anywhere we use PSSessions explicitly or Invoke-Command -AsJob
#
# Why these? https://learn.microsoft.com/en-us/powershell/module/microsoft.powershell.core/wait-job?view=powershell-5.1#description
# {quote}
# The Wait-Job cmdlet waits for a job to be in a terminating state before continuing execution. The terminating states are:
#
# Completed
# Failed
# Stopped
# Suspended
# Disconnected
# You can wait until a specified job, or all jobs are in a terminating state.
# You can also set a maximum wait time for the job using the Timeout parameter, or use the Force parameter to wait for a job in the Suspended or Disconnected states.
# {quote}
#
# really, I'd rather put Suspended and Disconnected somewhere else and handle them differently. But I'm not sure how.
# Until I do, I'll follow the documentation.
# Not that we can do anything with Suspended and Disconnected...
$terminatingJobStates = @(
"Completed",
"Failed",
"Stopped",
"Suspended",
"Disconnected"
)
# These are all the states that are not "done"
# some are close to done, but I am not sure they can be counted
# how long does it take to get from Stopping to Stopped?
# I don't know either
$runningJobStates = @(
"Running",
"NotStarted",
"Stopping",
"Suspending"
)
# Wait! Where did these things come from? They weren't in the other link!
# Good catch. https://learn.microsoft.com/en-us/dotnet/api/system.management.automation.jobstate?view=powershellsdk-1.1.0#fields
#
# I really don't know what to do with these, but they exist, and can show up.
# We shall start with "log and hope"
$wonkyJobStates = @(
"AtBreakpoint",
"Blocked"
)
# Cap the number of threads.
$objectCount = $Objects.Count
if ($NumThreads -gt $objectCount) {
$NumThreads = $objectCount
}
[array]$jobs = @()
[array]$results = @()
[array]$wonkyJobs = @()
[array]$completedJobs = @()
# foreach input object
if ($ThreadPerObject.IsPresent) {
#region OneObjectPerJob
# Create a PSJob for each object.
$createMoreJobs = $true
foreach ($object in $Objects) {
# Wait for any job to complete if there are any.
# Also, jobs that end up "Blocked" will throw, here...
# If we have hit the max number of concurrent jobs, wait.
if ( -NOT (Test-IsCollectionNullOrEmpty -Collection $jobs) -and ($jobs.Count -ge $NumThreads)) {
# Get the first job from the list of jobs, so we can have active-output.
# Then remove it from the list of jobs since we are going to be waiting on it.
# This has the potential to slow-down starting jobs if the workloads are "lumpy". If different jobs run in roughly the same amount of time, this isn't an issue.
#
# NOTE: Why are we doing the first by index instead of the first to finish?
# This is an odd thing to do
# Especially if this one ends up in a wonky or failed state
# Also, this never worked like Brent wanted it to - TomRowton - 2022-10-14
$firstJob = $jobs | Select-Object -First 1
$jobs = $jobs | Select-Object -Skip 1
$jobName = $firstJob.Name
$jobState = $firstJob.State
Write-Verbose "Receiving job named $jobName in state $jobState"
$jobResult = Receive-Job -Job $firstJob -Wait
# If we are not continuing on failure, don't create any more jobs.
if ($StopProcessingJobsOnError -and (!$jobResult.Success)) {
$createMoreJobs = $false
}
$results += $jobResult
if ($CleanupJobs) {
Write-Verbose "Removing job named $jobName"
Remove-Job -Job $firstJob -ErrorAction SilentlyContinue
}
}
# CHECK JOB STATES
# Scrub the jobs array of jobs that have finished, and receive their outputs.
# $runningJobs = $jobs | Where-Object { $_.State -in $runningJobStates };
# $completedJobs = $jobs | Where-Object { $_.State -in $terminatingJobStates }
$completedJobs = $jobs.Where({
$_.State -in $terminatingJobStates
})
$completedJobIds = $completedJobs.Id
$wonkyJobs = $jobs.Where({
$_.State -in $wonkyJobStates
})
$wonkyJobIds = $wonkyJobs.Id
if ( -NOT (Test-IsCollectionNullOrEmpty -Collection $completedJobs)) {
foreach ($completedJob in $completedJobs) {
$jobName = $completedJob.Name
$jobState = $completedJob.State
Write-Verbose "Receiving job named $jobName in state $jobState"
$jobResult = Receive-Job -Job $completedJob -Wait
# If we are not continuing on failure, don't create any more jobs.
if ($StopProcessingJobsOnError -and (!$jobResult.Success)) {
$createMoreJobs = $false
}
$results += $jobResult
if ($CleanupJobs) {
Write-Verbose "Removing job named $jobName"
Remove-Job -Job $completedJob -ErrorAction SilentlyContinue
}
Write-Verbose "Done receiving job named $jobName in state $jobState"
}
}
if (-NOT (Test-IsCollectionNullOrEmpty -Collection $wonkyJobs)) {
# Stop and Remove WONKY jobs
# where "wonky" is in the list above
foreach ($wonkyJob in $wonkyJobs) {
$jobName = $wonkyJob.Name
$jobState = $wonkyJob.State
Write-Warning "$loglead : Job named for object $jobName was in state $jobState - this is not recoverable"
Write-Warning "$loglead : Job data will be printed, job will be stopped, then removed. ErrorAction Continue is being forced."
Write-Warning "$loglead : We're all fine down here. How are you? ... Luke! We're gonna have company!"
Format-List -InputObject $wonkyJob -Property * -Force
Stop-Job -Job $wonkyJob -ErrorAction Continue
Remove-Job -Job $wonkyJob -ErrorAction Continue
Write-Warning "$loblead : Done Stopping and Removing job named for object $jobName"
}
}
# Repopulate the jobs array without Completed and Wonky jobs that have been Received and Removed, respectively
[array]$jobs = $jobs.Where({
$_.Id -notin $completedJobIds -and
$_.Id -notin $wonkyJobIds
})
# [array]$jobs = $runningJobs
$invokeArguments = @($object, $Script, $Arguments)
$executeScript = {
param($sb_object, $sb_script, $sb_arguments)
# Deserialize script block, turn it into a script block again.
$sb_script = [scriptblock]::Create($sb_script)
# Create an object to record any output from the user script, and whether it completed execution.
$sb_result = New-Object PSObject -Property @{
Object = $sb_object
Result = $null
Success = $true
Error = $null
}
try {
$sb_result.Result = Invoke-Command -ScriptBlock $sb_script -NoNewScope -ArgumentList $sb_object, $sb_arguments
} catch {
$sb_result.Success = $false
$sb_result.Error = $_
}
return $sb_result
}
# Break out of the loop if something has failed, and we are not continuing on failure.
if ($StopProcessingJobsOnError -and (!$createMoreJobs)) {
break
}
# Start a new job.
if (Test-StringIsNullOrWhitespace -Value $object.Name) {
if ($object.GetType().Name -eq "String") {
$objectName = $object
} else {
$objectName = $null
}
} else {
$objectName = $object.Name
}
Write-Verbose "Starting job for object $objectName"
$jobs += Start-Job -ScriptBlock $executeScript -ArgumentList $invokeArguments -Name $objectName
}
# Wait for all outstanding jobs to complete and collect results.
if ( -NOT (Test-IsCollectionNullOrEmpty -Collection $jobs)) {
foreach ($job in $jobs) {
$results += Receive-Job -Job $job -Wait
}
}
#endregion OneObjectPerJob
} else {
#region BatchObjectsPerJob
# Write an error if anyone is telling batched-parallelism to not-continue-on-failure.
# We don't have a way of tracking failures and stopping them, because the separate threads cannot coordinate with eachother.
if ($StopProcessingJobsOnError) {
Write-Error "$logLead : -StopProcessingJobsOnError is invalid for batched parallelism."
return
}
# Create N threads, and give X/N objects to each thread session.
# Define script that runs per thread.
$batchScript = {
param(
[object[]]$sb_objects,
[object]$sb_script,
[object[]]$sb_arguments
)
# Deserialize script block, turn it into a script block again.
$sb_script = [scriptblock]::Create($sb_script)
# Invoke user-provided script block on each object.
foreach ($object in $sb_objects) {
# Create an object to record any output from the user script, and whether it completed execution.
$sb_result = New-Object PSObject -Property @{
Object = $object
Result = $null
Success = $true
Error = $null
}
try {
$sb_result.Result = Invoke-Command -NoNewScope -ScriptBlock $sb_script -ArgumentList ($object, $sb_arguments)
} catch {
$sb_result.Success = $false
$sb_result.Error = $_
}
Write-Output $sb_result
}
}
# Determine how many objects to allocate to each task. Round up to get odd outliers.
$batchSize = [Math]::Ceiling($objectCount / $NumThreads)
# Start each thread, and give each thread an allocation of objects.
for ($i = 0; $i -lt $NumThreads; $i++) {
$start = $i * $batchSize
$end = (($i + 1) * $batchSize) - 1
$objectRange = $Objects[$start..$end]
if ($objectRange.Count -gt 0) {
$jobs += Start-Job -ScriptBlock $batchScript -ArgumentList ($objectRange, $Script, $Arguments)
}
}
# Receive-Job to output the logs.
foreach ($job in $jobs) {
[array]$resultArray = Receive-Job -Job $job -Wait
$results += $resultArray
}
#endregion BatchObjectsPerJob
}
#region ResultsAndOutput
# Write out errors if there are any.
# Write-Warning instead of throw so the caller can decide if they want this to stop script execution through the results.
[array]$errorResults = $results | Where-Object { !$_.Success }
if (!(Test-IsCollectionNullOrEmpty $errorResults)) {
$errorString = "$($errorResults.Count) object(s) had errors.`n"
$divider = "========================================================`n"
foreach ($result in $errorResults) {
$errorString += "$($divider)Object: $($result.Object)`nError: $($result.Error)`n"
}
$errorString += $divider
Write-Warning $errorString
}
# Return the results that finished.
return $results
#endregion ResultsAndOutput
}
}