最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

c# - Azure service bus: Long running message not renewing - Stack Overflow

programmeradmin1浏览0评论

We have a message that can run for up to 1 hour. It needs to be processed only once. We are getting issues where the message is becoming available to other processors however when we attempt to complete the message.

Here is the setup in service bus:

We setup the service bus in the following way:

var options = new ServiceBusProcessorOptions
{
    AutoCompleteMessages = false,

    MaxConcurrentCalls = 1,
    PrefetchCount = 0,
    ReceiveMode = ServiceBusReceiveMode.PeekLock,
    MaxAutoLockRenewalDuration = TimeSpan.FromHours(2),

};

We then handle the message in the following way:

private async Task HandleMessageAsync(ProcessMessageEventArgs processMessageEventArgs)
{
    try
    {
        var rawMessageBody = Encoding.UTF8.GetString(processMessageEventArgs.Message.Body);
        _logger.LogInformation("Received message {MessageId} with body {MessageBody}",
            processMessageEventArgs.Message.MessageId, rawMessageBody);

        var repoRequest = JsonConvert.DeserializeObject<TMessage>(rawMessageBody);
        if (repoRequest != null)
        {
            await ProcessMessage(repoRequest, processMessageEventArgs.Message.MessageId,
                processMessageEventArgs.Message.ApplicationProperties,
                processMessageEventArgs.CancellationToken);
        }
        else
        {
            _logger.LogError(
                "Unable to deserialize to message contract {ContractName} for message {MessageBody}",
                typeof(TMessage), rawMessageBody);
        }

        _logger.LogInformation("Message {MessageId} processed", processMessageEventArgs.Message.MessageId);

        await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message);
    }
    catch (Exception ex)
    {
        await processMessageEventArgs.AbandonMessageAsync(processMessageEventArgs.Message);
        _logger.LogError(ex, "Unable to handle message");
    }
}

However we are getting constant ServiceBusReceiver.RenewMessageLock exceptions along the way which is fine as the message keeps processing. However at the end of the message when we manually call await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message); it is failing with

Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. For more information please see . Reference:xxxxxxx, TrackingId:xxxxx, SystemTracker:gi::G9:4521499:amqps://xxxxxx.servicebus.windows/-eb5b7cf4;25:30:31:source(address:/xxxxxx,filter:[]), bi::in-connection1648(G9-96965)::session1654::link417153, Timestamp:2025-02-05T11:37:31 (MessageLockLost). For troubleshooting information, see . at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.DisposeMessageAsync(Guid lockToken, Outcome outcome, DispositionStatus disposition, TimeSpan timeout, IDictionary2 propertiesToModify, String deadLetterReason, String deadLetterDescription) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteInternalAsync(Guid lockToken, TimeSpan timeout) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c.<<CompleteAsync>b__47_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__221.<b__22_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteAsync(Guid lockToken, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)

What should we be doing differently. I know that even though MaxAutoLockRenewalDuration is set to 2 hours, Azure doesn't guarantee renewal under all conditions. But how should we be handling this?

We have a message that can run for up to 1 hour. It needs to be processed only once. We are getting issues where the message is becoming available to other processors however when we attempt to complete the message.

Here is the setup in service bus:

We setup the service bus in the following way:

var options = new ServiceBusProcessorOptions
{
    AutoCompleteMessages = false,

    MaxConcurrentCalls = 1,
    PrefetchCount = 0,
    ReceiveMode = ServiceBusReceiveMode.PeekLock,
    MaxAutoLockRenewalDuration = TimeSpan.FromHours(2),

};

We then handle the message in the following way:

private async Task HandleMessageAsync(ProcessMessageEventArgs processMessageEventArgs)
{
    try
    {
        var rawMessageBody = Encoding.UTF8.GetString(processMessageEventArgs.Message.Body);
        _logger.LogInformation("Received message {MessageId} with body {MessageBody}",
            processMessageEventArgs.Message.MessageId, rawMessageBody);

        var repoRequest = JsonConvert.DeserializeObject<TMessage>(rawMessageBody);
        if (repoRequest != null)
        {
            await ProcessMessage(repoRequest, processMessageEventArgs.Message.MessageId,
                processMessageEventArgs.Message.ApplicationProperties,
                processMessageEventArgs.CancellationToken);
        }
        else
        {
            _logger.LogError(
                "Unable to deserialize to message contract {ContractName} for message {MessageBody}",
                typeof(TMessage), rawMessageBody);
        }

        _logger.LogInformation("Message {MessageId} processed", processMessageEventArgs.Message.MessageId);

        await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message);
    }
    catch (Exception ex)
    {
        await processMessageEventArgs.AbandonMessageAsync(processMessageEventArgs.Message);
        _logger.LogError(ex, "Unable to handle message");
    }
}

However we are getting constant ServiceBusReceiver.RenewMessageLock exceptions along the way which is fine as the message keeps processing. However at the end of the message when we manually call await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message); it is failing with

Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. For more information please see https://aka.ms/ServiceBusExceptions . Reference:xxxxxxx, TrackingId:xxxxx, SystemTracker:gi::G9:4521499:amqps://xxxxxx.servicebus.windows.net/-eb5b7cf4;25:30:31:source(address:/xxxxxx,filter:[]), bi::in-connection1648(G9-96965)::session1654::link417153, Timestamp:2025-02-05T11:37:31 (MessageLockLost). For troubleshooting information, see https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot. at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.DisposeMessageAsync(Guid lockToken, Outcome outcome, DispositionStatus disposition, TimeSpan timeout, IDictionary2 propertiesToModify, String deadLetterReason, String deadLetterDescription) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteInternalAsync(Guid lockToken, TimeSpan timeout) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c.<<CompleteAsync>b__47_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__221.<b__22_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteAsync(Guid lockToken, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)

What should we be doing differently. I know that even though MaxAutoLockRenewalDuration is set to 2 hours, Azure doesn't guarantee renewal under all conditions. But how should we be handling this?

Share Improve this question asked Feb 5 at 18:34 user1112324user1112324 6431 gold badge9 silver badges23 bronze badges 5
  • Two hours is not an insignificant amount of time. Lock renewal from the client-side is definitely not a guaranteed operation. The question is do you get occasional lock lost exceptions or all the time. If it's all the time, I'd check with the service team (github.com/Azure/azure-service-bus/issues) to see if the duration you're choosing is supported. Based on the answer, plan accordingly. You could complete the message by adding a tracking record in the DB and issuing a "refresh status" message in the future until long-running tasks is completed but in involves a lot of custom code. – Sean Feldman Commented Feb 5 at 19:30
  • I think that it's better to utilize the Inbox pattern. Take the message, save it locally, complete the message. Then later process it in any way you like. I don't think that Service Bus was made for your scenario. – Vlad DX Commented Feb 5 at 21:04
  • Azure does not guarantee automatic renewal, so you can call RenewMessageLockAsync periodically while processing the message. If this is a rare issue use explicit lock renewal or if this happens all the time: Use a tracking database or the Inbox pattern as @SeanFeldman and @VladDX said. – Suresh Chikkam Commented Feb 6 at 4:20
  • Thanks so much for your answers. Yes the renewal message exception happens all the time. Usually however the message doesn't reprocess once we call complete message. But as you said we likely need to make a note of the messageid and then track it to completion or just throw to reprocess the message if required. Little confused though - what is servicebus meant for if not jobs? What is a better practice? – user1112324 Commented Feb 6 at 8:38
  • @SureshChikkam: "Azure does not guarantee automatic renewal" is an unfounded statement. Lock renewal is entirely a client-side operation - whether automatic or manual. The only difference is whether the processor calls RenewMessageLockAsync or an app does. When a lock is lost in a scenario like this, it generally is because the service state is lost, not that renewal was not attempted. – Jesse Squire Commented Feb 6 at 19:15
Add a comment  | 

1 Answer 1

Reset to default 0

Auto-lock renewal is not reliable for long-running tasks. manually renew the lock.

As @SeanFeldman, @VladDX said,

  • Store the message in a database or blob storage.

  • Acknowledge the message immediately process it separately in an independent worker.

Instead of Auto-lock renewal session-based messages provides an exclusive lock that lasts as long as the session remains active.

Azure Service Bus locks the entire session as long as you keep the session open, the message won’t be available to other consumers.

Same processor continues handling the session’s messages until it’s closed.

Enable "session" while creating the queue in service bus.

  • Consume all messages in the queue using your existing regular queue processor before enabling sessions.

  • Once the queue is empty, enable sessions and start sending messages with SessionId.

Updated the Sender to Including SessionId:

var message = new ServiceBusMessage(Encoding.UTF8.GetBytes("Your message body"))
{
    SessionId = "MySession-" + Guid.NewGuid().ToString() // Each message belongs to a session
};
await sender.SendMessageAsync(message);

Use a Session-Based Processor:

var processor = client.CreateSessionProcessor(queueName, new ServiceBusSessionProcessorOptions
{
    AutoCompleteMessages = false,
    MaxConcurrentSessions = 1, // Ensures only one processor at a time
    MaxConcurrentCallsPerSession = 1, // Ensures messages in a session are processed sequentially
    SessionIdleTimeout = TimeSpan.FromMinutes(5), // Releases session lock if idle
    PrefetchCount = 0
});

// Message handler for session-based processing
processor.ProcessMessageAsync += async args =>
{
    try
    {
        var messageBody = Encoding.UTF8.GetString(args.Message.Body);
        _logger.LogInformation("Processing message {MessageId} in session {SessionId}", args.Message.MessageId, args.SessionId);

        // Simulate long processing time (e.g., 1 hour)
        await Task.Delay(TimeSpan.FromMinutes(60));

        // Complete the message after processing
        await args.CompleteMessageAsync(args.Message);
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Error processing message {MessageId}");
        await args.AbandonMessageAsync(args.Message);
    }
};

// Error handler
processor.ProcessErrorAsync += args =>
{
    _logger.LogError(args.Exception, "Error in Service Bus session processor");
    return Task.CompletedTask;
};

// Start the processor
await processor.StartProcessingAsync();
发布评论

评论列表(0)

  1. 暂无评论