ps/Modules/Alkami.PowerShell.Common/Public/Invoke-Parallel.ps1
2023-05-30 22:51:22 -07:00

346 lines
15 KiB
PowerShell

function Invoke-Parallel {
<#
.SYNOPSIS
Executes a script block against a list of objects in parallel with PSJobs.
.PARAMETER Objects
Objects to operate against, in parallel
.PARAMETER Script
The ScriptBlock to execute
.PARAMETER Arguments
Arguments (or parameters) to pass to the script block, "globally" so that each Object is operated on "equally"
.PARAMETER ReturnObjects
Whether to collect the return values from your Script and return it in an array at the end. It is YOUR responsibility
to craft a ScriptBlock that returns data or return values that YOU can use.
.PARAMETER ThreadPerObject
Each object gets its own thread, instead of batched operation. Batched operation can save time in PSSession spin up and tear down.
The more objects, the more significant this can be.
.PARAMETER InitializationScript
This is the same as Invoke-Command's InitializationScript parameter. Use it to prepare the session with parameters or functions that you need
.PARAMETER ContinueOnFailure
Whether to Stop on Errors when Receiving the Jobs that executed your Script. Defaults to $True. If you set this to $False, the
first error in a Script will cause the rest of the Objects to not have the Script run against them. Be careful with this one.
.PARAMETER CleanupJobs
Whether to explicitly call Remove-Job when calling Receive-Job as an attempt to manually manage memory
.NOTES
The arguments are passed to each job globally. If you want to pass different arguments to different jobs, format it into the object[] objects argument.
Use -returnObjects if you want the results of the jobs read back, ala with a return statement. Just be careful with Write-Output usage.
#>
[CmdletBinding()]
param(
[Parameter(Mandatory = $true)]
[AllowNull()]
[object[]]$Objects,
[Parameter(Mandatory = $true)]
[object]$Script,
[Parameter(Mandatory = $false)]
[object[]]$Arguments = $null,
[Parameter(Mandatory = $false)]
[int]$NumThreads = 8,
[Parameter(Mandatory = $false)]
[switch]$ReturnObjects,
[Parameter(Mandatory = $false)]
[switch]$ThreadPerObject,
[Parameter(Mandatory = $false)]
[ScriptBlock]$InitializationScript = $null,
[Parameter(Mandatory = $false)]
[bool]$ContinueOnFailure = $true,
[Parameter(Mandatory = $false)]
[switch]$CleanupJobs
)
process {
$loglead = Get-LogLeadName
# Return if there are no elements to process.
if (Test-IsCollectionNullOrEmpty $objects) {
if ($returnObjects) {
return $null
} else {
return
}
}
# These are all the states that are "done" as in not doing anything
# I think "Suspended" should only be for Workflows
# I think "Disconnected" should only be for PSRemoting sessions
# those will come into play for Invoke-ParallelServers
# and anywhere we use PSSessions explicitly or Invoke-Command -AsJob
#
# Why these? https://learn.microsoft.com/en-us/powershell/module/microsoft.powershell.core/wait-job?view=powershell-5.1#description
# {quote}
# The Wait-Job cmdlet waits for a job to be in a terminating state before continuing execution. The terminating states are:
#
# Completed
# Failed
# Stopped
# Suspended
# Disconnected
# You can wait until a specified job, or all jobs are in a terminating state.
# You can also set a maximum wait time for the job using the Timeout parameter, or use the Force parameter to wait for a job in the Suspended or Disconnected states.
# {quote}
#
# really, I'd rather put Suspended and Disconnected somewhere else and handle them differently. But I'm not sure how.
# Until I do, I'll follow the documentation.
# Not that we can do anything with Suspended and Disconnected...
$terminatingJobStates = @(
"Completed",
"Failed",
"Stopped",
"Suspended",
"Disconnected"
)
# These are all the states that are not "done"
# some are close to done, but I am not sure they can be counted
# how long does it take to get from Stopping to Stopped?
# I don't know either
$runningJobStates = @(
"Running",
"NotStarted",
"Stopping",
"Suspending"
)
# Wait! Where did these things come from? They weren't in the other link!
# Good catch. https://learn.microsoft.com/en-us/dotnet/api/system.management.automation.jobstate?view=powershellsdk-1.1.0#fields
#
# I really don't know what to do with these, but they exist, and can show up.
# We shall start with "log and hope"
$wonkyJobStates = @(
"AtBreakpoint",
"Blocked"
)
# Cap the number of threads.
$objectCount = $Objects.Count
# I know this is non-standard. It pre-dates our standards. If I have time in this story, I'll try to fix it. TR - 20221012
if ($NumThreads -gt $objectCount) {
$NumThreads = $objectCount
}
# Figure out what to do when something fails.
$errorAction = if ($ContinueOnFailure) { "Continue" } else { "Stop" }
[array]$jobs = @()
[array]$results = @()
[array]$wonkyJobs = @()
[array]$completedJobs = @()
# For each input object.
if ($ThreadPerObject) {
#region OneObjectPerJob
# Create a PSJob for each object.
foreach ($object in $Objects) {
# Wait for any job to complete if there are any.
# Also, jobs that end up "Blocked" will throw, here...
# If we have hit the max number of concurrent jobs, wait.
if ( -NOT (Test-IsCollectionNullOrEmpty -Collection $jobs) -and ($jobs.Count -ge $NumThreads)) {
Write-Verbose "$loglead : maximum jobs running... wating for any job to complete..."
Wait-Job -Job $jobs -Any | Out-Null
}
# CHECK JOB STATES
# Scrub the jobs array of jobs that have finished, and receive their outputs.
# $runningJobs = $jobs | Where-Object { $_.State -in $runningJobStates };
# $completedJobs = $jobs | Where-Object { $_.State -in $terminatingJobStates }
$completedJobs = $jobs.Where({
$_.State -in $terminatingJobStates
})
$completedJobIds = $completedJobs.Id
$wonkyJobs = $jobs.Where({
$_.State -in $wonkyJobStates
})
$wonkyJobIds = $wonkyJobs.Id
if (!(Test-IsCollectionNullOrEmpty $completedJobs)) {
foreach ($completedJob in $completedJobs) {
$jobName = $completedJob.Name
$jobState = $completedJob.State
Write-Verbose "Receiving job named for object $jobName in state $jobState"
if ($ReturnObjects) {
$results += Receive-Job -Job $completedJob -ErrorAction $errorAction
} else {
Receive-Job -Job $completedJob -ErrorAction $errorAction
}
if ($CleanupJobs) {
Write-Verbose "Removing job named for object $jobName"
Remove-Job -Job $completedJob -ErrorAction SilentlyContinue
}
Write-Verbose "Done receiving job named for object $jobName in state $jobState"
}
}
if (-NOT (Test-IsCollectionNullOrEmpty -Collection $wonkyJobs)) {
# Stop and Remove WONKY jobs
# where "wonky" is in the list above
foreach ($wonkyJob in $wonkyJobs) {
$jobName = $wonkyJob.Name
$jobState = $wonkyJob.State
Write-Warning "$loglead : Job named for object $jobName was in state $jobState - this is not recoverable"
Write-Warning "$loglead : Job data will be printed, job will be stopped, then removed. ErrorAction Continue is being forced."
Write-Warning "$loglead : We're all fine down here. How are you? ... Luke! We're gonna have company!"
Format-List -InputObject $wonkyJob -Property * -Force
Stop-Job -Job $wonkyJob -ErrorAction Continue
Remove-Job -Job $wonkyJob -ErrorAction Continue
Write-Warning "$loblead : Done Stopping and Removing job named for object $jobName"
}
}
# Repopulate the jobs array without Completed and Wonky jobs that have been Received and Removed, respectively
[array]$jobs = $jobs.Where({
$_.Id -notin $completedJobIds -and
$_.Id -notin $wonkyJobIds
})
# Start a new job.
if (Test-StringIsNullOrWhitespace -Value $object.Name) {
if ($object.GetType().Name -eq "String") {
$objectName = $object
} else {
$objectName = $null
}
} else {
$objectName = $object.Name
}
Write-Verbose "Starting job for object $objectName"
$jobs += Start-Job -Name $objectName -ScriptBlock $Script -ArgumentList $object, $Arguments -InitializationScript $InitializationScript
}
# Another round of Wonky Job cleanup
$wonkyJobs = $jobs.Where({
$_.State -in $wonkyJobStates
})
$wonkyJobIds = $wonkyJobs.Id
if (-NOT (Test-IsCollectionNullOrEmpty -Collection $wonkyJobs)) {
# Stop and Remove WONKY jobs
# where "wonky" is in the list above
foreach ($wonkyJob in $wonkyJobs) {
$jobName = $wonkyJob.Name
$jobState = $wonkyJob.State
Write-Warning "$loglead : Job named for object $jobName was in state $jobState - this is not recoverable"
Write-Warning "$loglead : Job data will be printed, job will be stopped, then removed. ErrorAction Continue is being forced."
Write-Warning "$loglead : We're all fine down here. How are you? ... Luke! We're gonna have company!"
Format-List -InputObject $wonkyJob -Property * -Force
Stop-Job -Job $wonkyJob -ErrorAction Continue
Remove-Job -Job $wonkyJob -ErrorAction Continue
Write-Warning "$loblead : Done Stopping and Removing job named for object $jobName"
}
}
# Repopulate the jobs array without Wonky jobs that have been Removed
[array]$jobs = $jobs.Where({
$_.Id -notin $wonkyJobIds
})
# Wait for all outstanding jobs to complete.
Write-Verbose "Waiting for jobs to complete..."
Wait-Job -Job $jobs | Out-Null
# If we want to return the output stream from jobs in a list.
foreach ($job in $jobs) {
$jobName = $job.Name
$jobState = $job.State
Write-Verbose "Receiving job named for object $jobName in state $jobState"
if ($ReturnObjects) {
$results += Receive-Job -Job $job -ErrorAction $errorAction
} else {
Receive-Job -Job $job -ErrorAction $errorAction
}
if ($CleanupJobs) {
Write-Verbose "Removing job named for object $jobName"
Remove-Job -Job $job -ErrorAction SilentlyContinue
}
Write-Verbose "Done receiving job named for object $jobName in state $jobState"
}
if ($ReturnObjects) {
return $results
}
#endregion OneObjectPerJob
} else {
#region BatchObjectsPerJob
# Create N threads, and give X/N objects to each thread session.
# Define script that runs per thread.
$batchScript = {
param(
[object[]]$objects,
[object]$script,
[object[]]$arguments
)
# Deserialize script block, turn it into a script block again.
$script = [scriptblock]::Create($script)
# Invoke user-provided script block on each object.
# SRE-13225 - The ErrorAction on Invoke-Command does NOT affect what happens INSIDE
# the ScriptBlock. Because we're batching things to be parallelized on "shared threads"
# this ErrorAction allows us to have a failure in the middle of a batch without
# halting the entire batch.
foreach ($object in $objects) {
Invoke-Command -ErrorAction Continue -ScriptBlock $script -ArgumentList ($object, $arguments)
}
}
# Determine how many objects to allocate to each task. Round up to get odd outliers.
$batchSize = [Math]::Ceiling($objectCount / $NumThreads);
# Start each thread, and give each thread an allocation of objects.
for ($i = 0; $i -lt $numThreads; $i++) {
$start = $i * $batchSize
$end = (($i + 1) * $batchSize) - 1
$objectRange = $Objects[$start..$end]
if ($objectRange.Count -gt 0) {
$batchName = "Batch_$($i)"
$jobs += Start-Job -Name $batchName -ScriptBlock $batchScript -ArgumentList ($objectRange, $Script, $Arguments) -InitializationScript $InitializationScript
}
}
# Wait for all jobs to complete.
Write-Verbose "Waiting for jobs to complete..."
Wait-Job -Job $jobs | Out-Null
foreach ($job in $jobs) {
$jobName = $job.Name
$jobState = $job.State
Write-Verbose "Receiving job named for object $jobName in state $jobState"
if ($ReturnObjects) {
$results += Receive-Job -Job $job -ErrorAction $errorAction
} else {
Receive-Job -Job $job -ErrorAction $errorAction
}
if ($CleanupJobs) {
Write-Verbose "Removing job named for object $jobName"
Remove-Job -Job $job -ErrorAction SilentlyContinue
}
Write-Verbose "Done receiving job named for object $jobName in state $jobState"
}
# If we want to return the output stream from jobs in a list.
if ($ReturnObjects) {
return $results
}
#endregion BatchObjectsPerJob
}
}
}