最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

How to Implement Controlled Parallelism with Azure Powershell Durable Functions to Work with Exchange Module Session Limits? - S

programmeradmin0浏览0评论

I am working on a solution that uses Azure Durable Functions to perform multiple parallel tasks in the ExchangeOnlineManagement module, collecting the Unified Audit Log to improve efficiency and speed up some long-running operations. Here is my current setup and challenge:

Overall Goal:

  1. I want to use Azure Durable Functions to run activities in parallel, leveraging the ExchangeOnlineManagement module.
  2. However, the ExchangeOnlineManagement module has a limitation of allowing only 3-5 concurrent sessions at a time.
  3. Therefore, I need a way to ensure that I do not exceed this session limit while still running as many activities concurrently as possible to speed up processing.

Current Workflow:

  1. HTTP Starter: I trigger the workflow with an HTTP request.
  2. Orchestrator: This function is designed to:
    • Create "chunks" of tasks that can run concurrently.
    • Limit the number of concurrent sessions to respect the Connect-Exchange restriction.
    • Wait for a task to complete before starting a new one if the maximum concurrent limit is reached.
  3. Activities performing the tasks. (This part works)

The challenge I am facing is how to effectively manage the parallelism in the orchestrator, such that it runs a maximum of 3-5 activities concurrently, waits for any of them to complete, and then starts a new one while maintaining an efficient workflow.

My issue

Where I'm stuck is trying to run tasks in parallel, but only 2-3 at a time, ensuring that I wait for one task to complete before starting the next. To track progress, I store the number of chunks to be processed, along with the counts for completed and remaining chunks, in variables. However, every time the orchestrator function restarts, these variables reset, causing all progress to be lost.

I have tried several approaches, such as using Set-DurableCustomStatus and creating script level or global variables,only setting the variables when it's not a replay, but none have worked to maintain state across orchestrator replays.

How would I keep track of the progress in my Orchestrator, or is this not something you should do in an Orchestrator? Or are there better solutions available, like storing state in an Azure Queue?

How my code currently works

My HTTP starter is triggered with a start date, end date, and interval. Based on this information, several time chunks are calculated. Currently, for testing purposes, I'm using a 1 day period with an interval of 240 minutes, resulting in 6 chunks. The idea is to trigger an activity for each of the 6 chunks, but only run 2 activities at the same time in my testing setup.

$timeChunks = @()
$currentStart = [DateTime]::Parse($input.startDate)
$endDate = [DateTime]::Parse($input.endDate)

while ($currentStart -lt $endDate) {
    $currentEnd = $currentStart.AddMinutes($input.interval)
    if ($currentEnd -gt $endDate) {
        $currentEnd = $endDate
    }
    
    $timeChunks += @{
        startDate = $currentStart.ToString("yyyy-MM-ddTHH:mm:ss")
        endDate = $currentEnd.ToString("yyyy-MM-ddTHH:mm:ss")
        tenantId = $input.tenantId
        maxResults = 5000
        chunkNumber = $timeChunks.Count
    }
    $currentStart = $currentEnd
}

In the next section, I initiate two parallel tasks and then wait for one to complete. Once a task completes, it is recorded in $completedChunkNumber, which I use to track which chunks are finished and which are still pending. However, every time the orchestrator restarts, the state resets to an empty array, resulting in all progress being lost.

# Process chunks while maintaining 2 parallel tasks
while ($activeTasks.Count -gt 0 -or $remainingChunks.Count -gt 0) {
    Write-Host "Active tasks: $($activeTasks.Count), Remaining chunks: $($remainingChunks.Count)"

    # Start new tasks if needed
    while ($activeTasks.Count -lt $maxParallelTasks -and $remainingChunks.Count -gt 0) {
        $chunk = $remainingChunks.Dequeue()
        Write-Host "Starting new task for chunk $($chunk.chunkNumber)"
        $task = Invoke-DurableActivity -FunctionName "Get-UAL-Activity" -Input $chunk -NoWait
        $activeTasks[$chunk.chunkNumber] = @{
            Task = $task
            Chunk = $chunk
        }
    }
    
    if ($activeTasks.Count -gt 0) {
        $tasks = @($activeTasks.Values | ForEach-Object { $_.Task })
        $completedTask = Wait-DurableTask -Task $tasks -Any
        
        # Find completed chunk
        $completedEntry = $activeTasks.GetEnumerator() | 
            Where-Object { $_.Value.Task.TaskId -eq $completedTask.TaskId } | 
            Select-Object -First 1
    
        $completedChunkNumber = $completedEntry.Key
        $originalChunk = $completedEntry.Value.Chunk
            
        Write-Host "Processing completion for chunk $completedChunkNumber"
            
        # Process result
        $result = Get-DurableTaskResult -Task $completedTask
        Write-Host "Got result: $($result | ConvertTo-Json)"
                        
        # Add to appropriate output collection
        if ($result.Status -eq "Success") {
            $finalOutput.Success += $result
            Write-Host "Added successful result for chunk $($result.ChunkNumber)"
        }
        else {
            $finalOutput.Failed += $result 
            Write-Host "Added failed result for chunk $($result.ChunkNumber)"
        }
        
        $finalOutput.ProcessedChunks += $completedChunkNumber
        
        
        # Remove completed task
        $activeTasks.Remove($completedChunkNumber)
        }
    }
}

I attempted to keep track of progress by using Set-DurableCustomStatus, but I'm unsure how to retrieve and use this status within the orchestrator itself, as its primary purpose seems to be for external status checking. How can I track progress within the orchestrator without losing it on every replay?

Azure Function Logs

My HTTP trigger calls the Orchestrator with the following output:

2024-11-18T10:15:21Z   [Information]   INFORMATION: Sending input: {
  "endDate": "2024-11-18T10:14:59",
  "timeoutMinutes": 6000,
  "tenantId": "blabla-ebfa12e7a31c",
  "startDate": "2024-11-17T10:14:59",
  "interval": 240
}

Then the Orchestrator will calculate the chunks and start tasks 0 and 1. It then gets the results for both of them before starting over and initiating tasks 0 and 1 again.:

2024-11-18T10:16:23Z   [Information]   INFORMATION: ================ ORCHESTRATOR START ================
2024-11-18T10:16:23Z   [Information]   INFORMATION: Created 6 chunks to process
2024-11-18T10:16:23Z   [Information]   INFORMATION: Processing chunks with max 2 parallel tasks
2024-11-18T10:16:23Z   [Information]   INFORMATION: Active tasks: 0, Remaining chunks: 6
2024-11-18T10:16:23Z   [Information]   INFORMATION: Starting new task for chunk 0
2024-11-18T10:16:23Z   [Information]   INFORMATION: Starting new task for chunk 1
2024-11-18T10:16:23Z   [Information]   INFORMATION: Processing completion for chunk 1
2024-11-18T10:16:23Z   [Information]   INFORMATION: Got result: {
  "Status": "Success",
  "EndDate": "2024-11-17T14:14:59",
  "HasData": true,
  "StartDate": "2024-11-17T10:14:59",
  "BlobName": "Chuck-0.csv",
  "ChunkNumber": 0,
  "RecordsRetrieved": 586
}
2024-11-18T10:16:23Z   [Information]   INFORMATION: Added successful result for chunk 1
2024-11-18T10:16:23Z   [Information]   INFORMATION: Active tasks: 1, Remaining chunks: 4
2024-11-18T10:16:23Z   [Information]   INFORMATION: Starting new task for chunk 2
2024-11-18T10:16:23Z   [Information]   INFORMATION: Processing completion for chunk 2
2024-11-18T10:16:23Z   [Information]   INFORMATION: Got result: {
  "Status": "Success",
  "EndDate": "2024-11-17T14:14:59",
  "HasData": true,
  "StartDate": "2024-11-17T10:14:59",
  "BlobName": "Chuck-0.csv",
  "ChunkNumber": 0,
  "RecordsRetrieved": 586
}
2024-11-18T10:16:23Z   [Information]   INFORMATION: Added successful result for chunk 2
2024-11-18T10:16:23Z   [Information]   INFORMATION: Active tasks: 1, Remaining chunks: 3
2024-11-18T10:16:23Z   [Information]   INFORMATION: Starting new task for chunk 3
2024-11-18T10:16:23Z   [Information]   cacb3bf3-6042-4050-94af-dd28682f9f4e: Function 'Get-UAL-Activity (Activity)' scheduled. Reason: Get-UAL-Orchestrator. IsReplay: False. State: Scheduled. RuntimeStatus: Pending. HubName: AzureLogs. AppName: AzureLogs. SlotName: Production. ExtensionVersion: 2.13.4. SequenceNumber: 15.
2024-11-18T10:16:23Z   [Information]   Executed 'Functions.Get-UAL-Orchestrator' (Succeeded, Id=a989d96f-a98b-47ea-a9c2-6d407fdc5ee8, Duration=35ms)
2024-11-18T10:16:29Z   [Information]   Executing 'Functions.Get-UAL-Orchestrator' (Reason='(null)', Id=892258ce-29f7-4ded-9be1-5060c7fad9fb)
2024-11-18T10:16:29Z   [Verbose]   Sending invocation id: '892258ce-29f7-4ded-9be1-5060c7fad9fb
2024-11-18T10:16:29Z   [Verbose]   Posting invocation id:892258ce-29f7-4ded-9be1-5060c7fad9fb on workerId:b273c3af-20e0-4365-b70d-218b4c6e49ea
2024-11-18T10:16:29Z   [Information]   INFORMATION: ================ ORCHESTRATOR START ================
2024-11-18T10:16:29Z   [Information]   INFORMATION: Created 6 chunks to process
2024-11-18T10:16:29Z   [Information]   INFORMATION: Processing chunks with max 2 parallel tasks
2024-11-18T10:16:29Z   [Information]   INFORMATION: Active tasks: 0, Remaining chunks: 6
2024-11-18T10:16:29Z   [Information]   INFORMATION: Starting new task for chunk 0
2024-11-18T10:16:29Z   [Information]   INFORMATION: Starting new task for chunk 1
2024-11-18T10:16:29Z   [Information]   INFORMATION: Processing completion for chunk 1
2024-11-18T10:16:29Z   [Information]   INFORMATION: Got result: {
  "Status": "Success",
  "EndDate": "2024-11-17T14:14:59",
  "HasData": true,
  "StartDate": "2024-11-17T10:14:59",
  "BlobName": "Chuck-0.csv",
  "ChunkNumber": 0,
  "RecordsRetrieved": 586
}
2024-11-18T10:16:29Z   [Information]   INFORMATION: Added successful result for chunk 1
2024-11-18T10:16:29Z   [Information]   INFORMATION: Active tasks: 1, Remaining chunks: 4
2024-11-18T10:16:29Z   [Information]   INFORMATION: Starting new task for chunk 2
2024-11-18T10:16:29Z   [Information]   INFORMATION: Processing completion for chunk 2
2024-11-18T10:16:29Z   [Information]   INFORMATION: Got result: {
  "Status": "Success",
  "EndDate": "2024-11-17T14:14:59",
  "HasData": true,
  "StartDate": "2024-11-17T10:14:59",
  "BlobName": "Chuck-0.csv",
  "ChunkNumber": 0,
  "RecordsRetrieved": 586
}

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论