346 lines
15 KiB
PowerShell
346 lines
15 KiB
PowerShell
|
function Invoke-Parallel {
|
|||
|
<#
|
|||
|
.SYNOPSIS
|
|||
|
Executes a script block against a list of objects in parallel with PSJobs.
|
|||
|
.PARAMETER Objects
|
|||
|
Objects to operate against, in parallel
|
|||
|
.PARAMETER Script
|
|||
|
The ScriptBlock to execute
|
|||
|
.PARAMETER Arguments
|
|||
|
Arguments (or parameters) to pass to the script block, "globally" so that each Object is operated on "equally"
|
|||
|
.PARAMETER ReturnObjects
|
|||
|
Whether to collect the return values from your Script and return it in an array at the end. It is YOUR responsibility
|
|||
|
to craft a ScriptBlock that returns data or return values that YOU can use.
|
|||
|
.PARAMETER ThreadPerObject
|
|||
|
Each object gets its own thread, instead of batched operation. Batched operation can save time in PSSession spin up and tear down.
|
|||
|
The more objects, the more significant this can be.
|
|||
|
.PARAMETER InitializationScript
|
|||
|
This is the same as Invoke-Command's InitializationScript parameter. Use it to prepare the session with parameters or functions that you need
|
|||
|
.PARAMETER ContinueOnFailure
|
|||
|
Whether to Stop on Errors when Receiving the Jobs that executed your Script. Defaults to $True. If you set this to $False, the
|
|||
|
first error in a Script will cause the rest of the Objects to not have the Script run against them. Be careful with this one.
|
|||
|
.PARAMETER CleanupJobs
|
|||
|
Whether to explicitly call Remove-Job when calling Receive-Job as an attempt to manually manage memory
|
|||
|
.NOTES
|
|||
|
The arguments are passed to each job globally. If you want to pass different arguments to different jobs, format it into the object[] objects argument.
|
|||
|
Use -returnObjects if you want the results of the jobs read back, ala with a return statement. Just be careful with Write-Output usage.
|
|||
|
#>
|
|||
|
[CmdletBinding()]
|
|||
|
param(
|
|||
|
[Parameter(Mandatory = $true)]
|
|||
|
[AllowNull()]
|
|||
|
[object[]]$Objects,
|
|||
|
[Parameter(Mandatory = $true)]
|
|||
|
[object]$Script,
|
|||
|
[Parameter(Mandatory = $false)]
|
|||
|
[object[]]$Arguments = $null,
|
|||
|
[Parameter(Mandatory = $false)]
|
|||
|
[int]$NumThreads = 8,
|
|||
|
[Parameter(Mandatory = $false)]
|
|||
|
[switch]$ReturnObjects,
|
|||
|
[Parameter(Mandatory = $false)]
|
|||
|
[switch]$ThreadPerObject,
|
|||
|
[Parameter(Mandatory = $false)]
|
|||
|
[ScriptBlock]$InitializationScript = $null,
|
|||
|
[Parameter(Mandatory = $false)]
|
|||
|
[bool]$ContinueOnFailure = $true,
|
|||
|
[Parameter(Mandatory = $false)]
|
|||
|
[switch]$CleanupJobs
|
|||
|
)
|
|||
|
process {
|
|||
|
$loglead = Get-LogLeadName
|
|||
|
# Return if there are no elements to process.
|
|||
|
if (Test-IsCollectionNullOrEmpty $objects) {
|
|||
|
if ($returnObjects) {
|
|||
|
return $null
|
|||
|
} else {
|
|||
|
return
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
# These are all the states that are "done" as in not doing anything
|
|||
|
# I think "Suspended" should only be for Workflows
|
|||
|
# I think "Disconnected" should only be for PSRemoting sessions
|
|||
|
# those will come into play for Invoke-ParallelServers
|
|||
|
# and anywhere we use PSSessions explicitly or Invoke-Command -AsJob
|
|||
|
#
|
|||
|
# Why these? https://learn.microsoft.com/en-us/powershell/module/microsoft.powershell.core/wait-job?view=powershell-5.1#description
|
|||
|
# {quote}
|
|||
|
# The Wait-Job cmdlet waits for a job to be in a terminating state before continuing execution. The terminating states are:
|
|||
|
#
|
|||
|
# Completed
|
|||
|
# Failed
|
|||
|
# Stopped
|
|||
|
# Suspended
|
|||
|
# Disconnected
|
|||
|
# You can wait until a specified job, or all jobs are in a terminating state.
|
|||
|
# You can also set a maximum wait time for the job using the Timeout parameter, or use the Force parameter to wait for a job in the Suspended or Disconnected states.
|
|||
|
# {quote}
|
|||
|
#
|
|||
|
# really, I'd rather put Suspended and Disconnected somewhere else and handle them differently. But I'm not sure how.
|
|||
|
# Until I do, I'll follow the documentation.
|
|||
|
# Not that we can do anything with Suspended and Disconnected...
|
|||
|
|
|||
|
$terminatingJobStates = @(
|
|||
|
"Completed",
|
|||
|
"Failed",
|
|||
|
"Stopped",
|
|||
|
"Suspended",
|
|||
|
"Disconnected"
|
|||
|
)
|
|||
|
|
|||
|
|
|||
|
# These are all the states that are not "done"
|
|||
|
# some are close to done, but I am not sure they can be counted
|
|||
|
# how long does it take to get from Stopping to Stopped?
|
|||
|
# I don't know either
|
|||
|
$runningJobStates = @(
|
|||
|
"Running",
|
|||
|
"NotStarted",
|
|||
|
"Stopping",
|
|||
|
"Suspending"
|
|||
|
)
|
|||
|
|
|||
|
# Wait! Where did these things come from? They weren't in the other link!
|
|||
|
# Good catch. https://learn.microsoft.com/en-us/dotnet/api/system.management.automation.jobstate?view=powershellsdk-1.1.0#fields
|
|||
|
#
|
|||
|
# I really don't know what to do with these, but they exist, and can show up.
|
|||
|
# We shall start with "log and hope"
|
|||
|
$wonkyJobStates = @(
|
|||
|
"AtBreakpoint",
|
|||
|
"Blocked"
|
|||
|
)
|
|||
|
|
|||
|
# Cap the number of threads.
|
|||
|
$objectCount = $Objects.Count
|
|||
|
# I know this is non-standard. It pre-dates our standards. If I have time in this story, I'll try to fix it. TR - 20221012
|
|||
|
if ($NumThreads -gt $objectCount) {
|
|||
|
$NumThreads = $objectCount
|
|||
|
}
|
|||
|
|
|||
|
# Figure out what to do when something fails.
|
|||
|
$errorAction = if ($ContinueOnFailure) { "Continue" } else { "Stop" }
|
|||
|
|
|||
|
[array]$jobs = @()
|
|||
|
[array]$results = @()
|
|||
|
[array]$wonkyJobs = @()
|
|||
|
[array]$completedJobs = @()
|
|||
|
|
|||
|
# For each input object.
|
|||
|
if ($ThreadPerObject) {
|
|||
|
#region OneObjectPerJob
|
|||
|
# Create a PSJob for each object.
|
|||
|
foreach ($object in $Objects) {
|
|||
|
# Wait for any job to complete if there are any.
|
|||
|
# Also, jobs that end up "Blocked" will throw, here...
|
|||
|
# If we have hit the max number of concurrent jobs, wait.
|
|||
|
if ( -NOT (Test-IsCollectionNullOrEmpty -Collection $jobs) -and ($jobs.Count -ge $NumThreads)) {
|
|||
|
Write-Verbose "$loglead : maximum jobs running... wating for any job to complete..."
|
|||
|
Wait-Job -Job $jobs -Any | Out-Null
|
|||
|
}
|
|||
|
|
|||
|
# CHECK JOB STATES
|
|||
|
# Scrub the jobs array of jobs that have finished, and receive their outputs.
|
|||
|
# $runningJobs = $jobs | Where-Object { $_.State -in $runningJobStates };
|
|||
|
# $completedJobs = $jobs | Where-Object { $_.State -in $terminatingJobStates }
|
|||
|
$completedJobs = $jobs.Where({
|
|||
|
$_.State -in $terminatingJobStates
|
|||
|
})
|
|||
|
$completedJobIds = $completedJobs.Id
|
|||
|
$wonkyJobs = $jobs.Where({
|
|||
|
$_.State -in $wonkyJobStates
|
|||
|
})
|
|||
|
$wonkyJobIds = $wonkyJobs.Id
|
|||
|
|
|||
|
if (!(Test-IsCollectionNullOrEmpty $completedJobs)) {
|
|||
|
|
|||
|
foreach ($completedJob in $completedJobs) {
|
|||
|
$jobName = $completedJob.Name
|
|||
|
$jobState = $completedJob.State
|
|||
|
Write-Verbose "Receiving job named for object $jobName in state $jobState"
|
|||
|
if ($ReturnObjects) {
|
|||
|
$results += Receive-Job -Job $completedJob -ErrorAction $errorAction
|
|||
|
|
|||
|
} else {
|
|||
|
Receive-Job -Job $completedJob -ErrorAction $errorAction
|
|||
|
|
|||
|
}
|
|||
|
if ($CleanupJobs) {
|
|||
|
Write-Verbose "Removing job named for object $jobName"
|
|||
|
Remove-Job -Job $completedJob -ErrorAction SilentlyContinue
|
|||
|
}
|
|||
|
Write-Verbose "Done receiving job named for object $jobName in state $jobState"
|
|||
|
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
if (-NOT (Test-IsCollectionNullOrEmpty -Collection $wonkyJobs)) {
|
|||
|
# Stop and Remove WONKY jobs
|
|||
|
# where "wonky" is in the list above
|
|||
|
foreach ($wonkyJob in $wonkyJobs) {
|
|||
|
$jobName = $wonkyJob.Name
|
|||
|
$jobState = $wonkyJob.State
|
|||
|
Write-Warning "$loglead : Job named for object $jobName was in state $jobState - this is not recoverable"
|
|||
|
Write-Warning "$loglead : Job data will be printed, job will be stopped, then removed. ErrorAction Continue is being forced."
|
|||
|
Write-Warning "$loglead : We're all fine down here. How are you? ... Luke! We're gonna have company!"
|
|||
|
Format-List -InputObject $wonkyJob -Property * -Force
|
|||
|
Stop-Job -Job $wonkyJob -ErrorAction Continue
|
|||
|
Remove-Job -Job $wonkyJob -ErrorAction Continue
|
|||
|
Write-Warning "$loblead : Done Stopping and Removing job named for object $jobName"
|
|||
|
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
# Repopulate the jobs array without Completed and Wonky jobs that have been Received and Removed, respectively
|
|||
|
[array]$jobs = $jobs.Where({
|
|||
|
$_.Id -notin $completedJobIds -and
|
|||
|
$_.Id -notin $wonkyJobIds
|
|||
|
})
|
|||
|
|
|||
|
# Start a new job.
|
|||
|
if (Test-StringIsNullOrWhitespace -Value $object.Name) {
|
|||
|
if ($object.GetType().Name -eq "String") {
|
|||
|
$objectName = $object
|
|||
|
} else {
|
|||
|
$objectName = $null
|
|||
|
}
|
|||
|
} else {
|
|||
|
$objectName = $object.Name
|
|||
|
}
|
|||
|
Write-Verbose "Starting job for object $objectName"
|
|||
|
$jobs += Start-Job -Name $objectName -ScriptBlock $Script -ArgumentList $object, $Arguments -InitializationScript $InitializationScript
|
|||
|
}
|
|||
|
|
|||
|
# Another round of Wonky Job cleanup
|
|||
|
$wonkyJobs = $jobs.Where({
|
|||
|
$_.State -in $wonkyJobStates
|
|||
|
})
|
|||
|
$wonkyJobIds = $wonkyJobs.Id
|
|||
|
|
|||
|
if (-NOT (Test-IsCollectionNullOrEmpty -Collection $wonkyJobs)) {
|
|||
|
# Stop and Remove WONKY jobs
|
|||
|
# where "wonky" is in the list above
|
|||
|
foreach ($wonkyJob in $wonkyJobs) {
|
|||
|
$jobName = $wonkyJob.Name
|
|||
|
$jobState = $wonkyJob.State
|
|||
|
Write-Warning "$loglead : Job named for object $jobName was in state $jobState - this is not recoverable"
|
|||
|
Write-Warning "$loglead : Job data will be printed, job will be stopped, then removed. ErrorAction Continue is being forced."
|
|||
|
Write-Warning "$loglead : We're all fine down here. How are you? ... Luke! We're gonna have company!"
|
|||
|
Format-List -InputObject $wonkyJob -Property * -Force
|
|||
|
Stop-Job -Job $wonkyJob -ErrorAction Continue
|
|||
|
Remove-Job -Job $wonkyJob -ErrorAction Continue
|
|||
|
Write-Warning "$loblead : Done Stopping and Removing job named for object $jobName"
|
|||
|
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
# Repopulate the jobs array without Wonky jobs that have been Removed
|
|||
|
[array]$jobs = $jobs.Where({
|
|||
|
$_.Id -notin $wonkyJobIds
|
|||
|
})
|
|||
|
|
|||
|
# Wait for all outstanding jobs to complete.
|
|||
|
Write-Verbose "Waiting for jobs to complete..."
|
|||
|
Wait-Job -Job $jobs | Out-Null
|
|||
|
|
|||
|
# If we want to return the output stream from jobs in a list.
|
|||
|
foreach ($job in $jobs) {
|
|||
|
$jobName = $job.Name
|
|||
|
$jobState = $job.State
|
|||
|
Write-Verbose "Receiving job named for object $jobName in state $jobState"
|
|||
|
if ($ReturnObjects) {
|
|||
|
$results += Receive-Job -Job $job -ErrorAction $errorAction
|
|||
|
|
|||
|
} else {
|
|||
|
Receive-Job -Job $job -ErrorAction $errorAction
|
|||
|
|
|||
|
}
|
|||
|
|
|||
|
if ($CleanupJobs) {
|
|||
|
Write-Verbose "Removing job named for object $jobName"
|
|||
|
Remove-Job -Job $job -ErrorAction SilentlyContinue
|
|||
|
}
|
|||
|
Write-Verbose "Done receiving job named for object $jobName in state $jobState"
|
|||
|
|
|||
|
}
|
|||
|
if ($ReturnObjects) {
|
|||
|
return $results
|
|||
|
}
|
|||
|
|
|||
|
#endregion OneObjectPerJob
|
|||
|
|
|||
|
} else {
|
|||
|
#region BatchObjectsPerJob
|
|||
|
# Create N threads, and give X/N objects to each thread session.
|
|||
|
|
|||
|
# Define script that runs per thread.
|
|||
|
$batchScript = {
|
|||
|
param(
|
|||
|
[object[]]$objects,
|
|||
|
[object]$script,
|
|||
|
[object[]]$arguments
|
|||
|
)
|
|||
|
|
|||
|
# Deserialize script block, turn it into a script block again.
|
|||
|
$script = [scriptblock]::Create($script)
|
|||
|
|
|||
|
# Invoke user-provided script block on each object.
|
|||
|
# SRE-13225 - The ErrorAction on Invoke-Command does NOT affect what happens INSIDE
|
|||
|
# the ScriptBlock. Because we're batching things to be parallelized on "shared threads"
|
|||
|
# this ErrorAction allows us to have a failure in the middle of a batch without
|
|||
|
# halting the entire batch.
|
|||
|
foreach ($object in $objects) {
|
|||
|
Invoke-Command -ErrorAction Continue -ScriptBlock $script -ArgumentList ($object, $arguments)
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
# Determine how many objects to allocate to each task. Round up to get odd outliers.
|
|||
|
$batchSize = [Math]::Ceiling($objectCount / $NumThreads);
|
|||
|
|
|||
|
# Start each thread, and give each thread an allocation of objects.
|
|||
|
for ($i = 0; $i -lt $numThreads; $i++) {
|
|||
|
$start = $i * $batchSize
|
|||
|
$end = (($i + 1) * $batchSize) - 1
|
|||
|
|
|||
|
$objectRange = $Objects[$start..$end]
|
|||
|
|
|||
|
if ($objectRange.Count -gt 0) {
|
|||
|
$batchName = "Batch_$($i)"
|
|||
|
|
|||
|
$jobs += Start-Job -Name $batchName -ScriptBlock $batchScript -ArgumentList ($objectRange, $Script, $Arguments) -InitializationScript $InitializationScript
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
# Wait for all jobs to complete.
|
|||
|
Write-Verbose "Waiting for jobs to complete..."
|
|||
|
Wait-Job -Job $jobs | Out-Null
|
|||
|
|
|||
|
foreach ($job in $jobs) {
|
|||
|
$jobName = $job.Name
|
|||
|
$jobState = $job.State
|
|||
|
Write-Verbose "Receiving job named for object $jobName in state $jobState"
|
|||
|
if ($ReturnObjects) {
|
|||
|
$results += Receive-Job -Job $job -ErrorAction $errorAction
|
|||
|
|
|||
|
} else {
|
|||
|
Receive-Job -Job $job -ErrorAction $errorAction
|
|||
|
|
|||
|
}
|
|||
|
|
|||
|
if ($CleanupJobs) {
|
|||
|
Write-Verbose "Removing job named for object $jobName"
|
|||
|
Remove-Job -Job $job -ErrorAction SilentlyContinue
|
|||
|
}
|
|||
|
Write-Verbose "Done receiving job named for object $jobName in state $jobState"
|
|||
|
|
|||
|
}
|
|||
|
|
|||
|
# If we want to return the output stream from jobs in a list.
|
|||
|
if ($ReturnObjects) {
|
|||
|
return $results
|
|||
|
}
|
|||
|
|
|||
|
#endregion BatchObjectsPerJob
|
|||
|
}
|
|||
|
}
|
|||
|
}
|