I am working on a solution that uses Azure Durable Functions to perform multiple parallel tasks in the ExchangeOnlineManagement
module, collecting the Unified Audit Log to improve efficiency and speed up some long-running operations. Here is my current setup and challenge:
Overall Goal:
- I want to use Azure Durable Functions to run activities in parallel,
leveraging the
ExchangeOnlineManagement
module. - However, the
ExchangeOnlineManagement
module has a limitation of allowing only 3-5 concurrent sessions at a time. - Therefore, I need a way to ensure that I do not exceed this session limit while still running as many activities concurrently as possible to speed up processing.
Current Workflow:
- HTTP Starter: I trigger the workflow with an HTTP request.
- Orchestrator: This function is designed to:
- Create "chunks" of tasks that can run concurrently.
- Limit the number of concurrent sessions to respect the
Connect-Exchange
restriction. - Wait for a task to complete before starting a new one if the maximum concurrent limit is reached.
- Activities performing the tasks. (This part works)
The challenge I am facing is how to effectively manage the parallelism in the orchestrator, such that it runs a maximum of 3-5 activities concurrently, waits for any of them to complete, and then starts a new one while maintaining an efficient workflow.
My issue
Where I'm stuck is trying to run tasks in parallel, but only 2-3 at a time, ensuring that I wait for one task to complete before starting the next. To track progress, I store the number of chunks to be processed, along with the counts for completed and remaining chunks, in variables. However, every time the orchestrator function restarts, these variables reset, causing all progress to be lost.
I have tried several approaches, such as using Set-DurableCustomStatus
and creating script level or global variables,only setting the variables when it's not a replay, but none have worked to maintain state across orchestrator replays.
How would I keep track of the progress in my Orchestrator, or is this not something you should do in an Orchestrator? Or are there better solutions available, like storing state in an Azure Queue?
How my code currently works
My HTTP starter is triggered with a start date, end date, and interval. Based on this information, several time chunks are calculated. Currently, for testing purposes, I'm using a 1 day period with an interval of 240 minutes, resulting in 6 chunks. The idea is to trigger an activity for each of the 6 chunks, but only run 2 activities at the same time in my testing setup.
$timeChunks = @()
$currentStart = [DateTime]::Parse($input.startDate)
$endDate = [DateTime]::Parse($input.endDate)
while ($currentStart -lt $endDate) {
$currentEnd = $currentStart.AddMinutes($input.interval)
if ($currentEnd -gt $endDate) {
$currentEnd = $endDate
}
$timeChunks += @{
startDate = $currentStart.ToString("yyyy-MM-ddTHH:mm:ss")
endDate = $currentEnd.ToString("yyyy-MM-ddTHH:mm:ss")
tenantId = $input.tenantId
maxResults = 5000
chunkNumber = $timeChunks.Count
}
$currentStart = $currentEnd
}
In the next section, I initiate two parallel tasks and then wait for one to complete. Once a task completes, it is recorded in $completedChunkNumber
, which I use to track which chunks are finished and which are still pending. However, every time the orchestrator restarts, the state resets to an empty array, resulting in all progress being lost.
# Process chunks while maintaining 2 parallel tasks
while ($activeTasks.Count -gt 0 -or $remainingChunks.Count -gt 0) {
Write-Host "Active tasks: $($activeTasks.Count), Remaining chunks: $($remainingChunks.Count)"
# Start new tasks if needed
while ($activeTasks.Count -lt $maxParallelTasks -and $remainingChunks.Count -gt 0) {
$chunk = $remainingChunks.Dequeue()
Write-Host "Starting new task for chunk $($chunk.chunkNumber)"
$task = Invoke-DurableActivity -FunctionName "Get-UAL-Activity" -Input $chunk -NoWait
$activeTasks[$chunk.chunkNumber] = @{
Task = $task
Chunk = $chunk
}
}
if ($activeTasks.Count -gt 0) {
$tasks = @($activeTasks.Values | ForEach-Object { $_.Task })
$completedTask = Wait-DurableTask -Task $tasks -Any
# Find completed chunk
$completedEntry = $activeTasks.GetEnumerator() |
Where-Object { $_.Value.Task.TaskId -eq $completedTask.TaskId } |
Select-Object -First 1
$completedChunkNumber = $completedEntry.Key
$originalChunk = $completedEntry.Value.Chunk
Write-Host "Processing completion for chunk $completedChunkNumber"
# Process result
$result = Get-DurableTaskResult -Task $completedTask
Write-Host "Got result: $($result | ConvertTo-Json)"
# Add to appropriate output collection
if ($result.Status -eq "Success") {
$finalOutput.Success += $result
Write-Host "Added successful result for chunk $($result.ChunkNumber)"
}
else {
$finalOutput.Failed += $result
Write-Host "Added failed result for chunk $($result.ChunkNumber)"
}
$finalOutput.ProcessedChunks += $completedChunkNumber
# Remove completed task
$activeTasks.Remove($completedChunkNumber)
}
}
}
I attempted to keep track of progress by using Set-DurableCustomStatus
, but I'm unsure how to retrieve and use this status within the orchestrator itself, as its primary purpose seems to be for external status checking. How can I track progress within the orchestrator without losing it on every replay?
Azure Function Logs
My HTTP trigger calls the Orchestrator with the following output:
2024-11-18T10:15:21Z [Information] INFORMATION: Sending input: {
"endDate": "2024-11-18T10:14:59",
"timeoutMinutes": 6000,
"tenantId": "blabla-ebfa12e7a31c",
"startDate": "2024-11-17T10:14:59",
"interval": 240
}
Then the Orchestrator will calculate the chunks and start tasks 0 and 1. It then gets the results for both of them before starting over and initiating tasks 0 and 1 again.:
2024-11-18T10:16:23Z [Information] INFORMATION: ================ ORCHESTRATOR START ================
2024-11-18T10:16:23Z [Information] INFORMATION: Created 6 chunks to process
2024-11-18T10:16:23Z [Information] INFORMATION: Processing chunks with max 2 parallel tasks
2024-11-18T10:16:23Z [Information] INFORMATION: Active tasks: 0, Remaining chunks: 6
2024-11-18T10:16:23Z [Information] INFORMATION: Starting new task for chunk 0
2024-11-18T10:16:23Z [Information] INFORMATION: Starting new task for chunk 1
2024-11-18T10:16:23Z [Information] INFORMATION: Processing completion for chunk 1
2024-11-18T10:16:23Z [Information] INFORMATION: Got result: {
"Status": "Success",
"EndDate": "2024-11-17T14:14:59",
"HasData": true,
"StartDate": "2024-11-17T10:14:59",
"BlobName": "Chuck-0.csv",
"ChunkNumber": 0,
"RecordsRetrieved": 586
}
2024-11-18T10:16:23Z [Information] INFORMATION: Added successful result for chunk 1
2024-11-18T10:16:23Z [Information] INFORMATION: Active tasks: 1, Remaining chunks: 4
2024-11-18T10:16:23Z [Information] INFORMATION: Starting new task for chunk 2
2024-11-18T10:16:23Z [Information] INFORMATION: Processing completion for chunk 2
2024-11-18T10:16:23Z [Information] INFORMATION: Got result: {
"Status": "Success",
"EndDate": "2024-11-17T14:14:59",
"HasData": true,
"StartDate": "2024-11-17T10:14:59",
"BlobName": "Chuck-0.csv",
"ChunkNumber": 0,
"RecordsRetrieved": 586
}
2024-11-18T10:16:23Z [Information] INFORMATION: Added successful result for chunk 2
2024-11-18T10:16:23Z [Information] INFORMATION: Active tasks: 1, Remaining chunks: 3
2024-11-18T10:16:23Z [Information] INFORMATION: Starting new task for chunk 3
2024-11-18T10:16:23Z [Information] cacb3bf3-6042-4050-94af-dd28682f9f4e: Function 'Get-UAL-Activity (Activity)' scheduled. Reason: Get-UAL-Orchestrator. IsReplay: False. State: Scheduled. RuntimeStatus: Pending. HubName: AzureLogs. AppName: AzureLogs. SlotName: Production. ExtensionVersion: 2.13.4. SequenceNumber: 15.
2024-11-18T10:16:23Z [Information] Executed 'Functions.Get-UAL-Orchestrator' (Succeeded, Id=a989d96f-a98b-47ea-a9c2-6d407fdc5ee8, Duration=35ms)
2024-11-18T10:16:29Z [Information] Executing 'Functions.Get-UAL-Orchestrator' (Reason='(null)', Id=892258ce-29f7-4ded-9be1-5060c7fad9fb)
2024-11-18T10:16:29Z [Verbose] Sending invocation id: '892258ce-29f7-4ded-9be1-5060c7fad9fb
2024-11-18T10:16:29Z [Verbose] Posting invocation id:892258ce-29f7-4ded-9be1-5060c7fad9fb on workerId:b273c3af-20e0-4365-b70d-218b4c6e49ea
2024-11-18T10:16:29Z [Information] INFORMATION: ================ ORCHESTRATOR START ================
2024-11-18T10:16:29Z [Information] INFORMATION: Created 6 chunks to process
2024-11-18T10:16:29Z [Information] INFORMATION: Processing chunks with max 2 parallel tasks
2024-11-18T10:16:29Z [Information] INFORMATION: Active tasks: 0, Remaining chunks: 6
2024-11-18T10:16:29Z [Information] INFORMATION: Starting new task for chunk 0
2024-11-18T10:16:29Z [Information] INFORMATION: Starting new task for chunk 1
2024-11-18T10:16:29Z [Information] INFORMATION: Processing completion for chunk 1
2024-11-18T10:16:29Z [Information] INFORMATION: Got result: {
"Status": "Success",
"EndDate": "2024-11-17T14:14:59",
"HasData": true,
"StartDate": "2024-11-17T10:14:59",
"BlobName": "Chuck-0.csv",
"ChunkNumber": 0,
"RecordsRetrieved": 586
}
2024-11-18T10:16:29Z [Information] INFORMATION: Added successful result for chunk 1
2024-11-18T10:16:29Z [Information] INFORMATION: Active tasks: 1, Remaining chunks: 4
2024-11-18T10:16:29Z [Information] INFORMATION: Starting new task for chunk 2
2024-11-18T10:16:29Z [Information] INFORMATION: Processing completion for chunk 2
2024-11-18T10:16:29Z [Information] INFORMATION: Got result: {
"Status": "Success",
"EndDate": "2024-11-17T14:14:59",
"HasData": true,
"StartDate": "2024-11-17T10:14:59",
"BlobName": "Chuck-0.csv",
"ChunkNumber": 0,
"RecordsRetrieved": 586
}