We have a message that can run for up to 1 hour. It needs to be processed only once. We are getting issues where the message is becoming available to other processors however when we attempt to complete the message.
Here is the setup in service bus:
We setup the service bus in the following way:
var options = new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
PrefetchCount = 0,
ReceiveMode = ServiceBusReceiveMode.PeekLock,
MaxAutoLockRenewalDuration = TimeSpan.FromHours(2),
};
We then handle the message in the following way:
private async Task HandleMessageAsync(ProcessMessageEventArgs processMessageEventArgs)
{
try
{
var rawMessageBody = Encoding.UTF8.GetString(processMessageEventArgs.Message.Body);
_logger.LogInformation("Received message {MessageId} with body {MessageBody}",
processMessageEventArgs.Message.MessageId, rawMessageBody);
var repoRequest = JsonConvert.DeserializeObject<TMessage>(rawMessageBody);
if (repoRequest != null)
{
await ProcessMessage(repoRequest, processMessageEventArgs.Message.MessageId,
processMessageEventArgs.Message.ApplicationProperties,
processMessageEventArgs.CancellationToken);
}
else
{
_logger.LogError(
"Unable to deserialize to message contract {ContractName} for message {MessageBody}",
typeof(TMessage), rawMessageBody);
}
_logger.LogInformation("Message {MessageId} processed", processMessageEventArgs.Message.MessageId);
await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message);
}
catch (Exception ex)
{
await processMessageEventArgs.AbandonMessageAsync(processMessageEventArgs.Message);
_logger.LogError(ex, "Unable to handle message");
}
}
However we are getting constant ServiceBusReceiver.RenewMessageLock exceptions along the way which is fine as the message keeps processing. However at the end of the message when we manually call await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message);
it is failing with
Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. For more information please see . Reference:xxxxxxx, TrackingId:xxxxx, SystemTracker:gi::G9:4521499:amqps://xxxxxx.servicebus.windows/-eb5b7cf4;25:30:31:source(address:/xxxxxx,filter:[]), bi::in-connection1648(G9-96965)::session1654::link417153, Timestamp:2025-02-05T11:37:31 (MessageLockLost). For troubleshooting information, see . at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.DisposeMessageAsync(Guid lockToken, Outcome outcome, DispositionStatus disposition, TimeSpan timeout, IDictionary
2 propertiesToModify, String deadLetterReason, String deadLetterDescription) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteInternalAsync(Guid lockToken, TimeSpan timeout) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c.<<CompleteAsync>b__47_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__22
1.<b__22_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func
4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteAsync(Guid lockToken, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)
What should we be doing differently. I know that even though MaxAutoLockRenewalDuration is set to 2 hours, Azure doesn't guarantee renewal under all conditions. But how should we be handling this?
We have a message that can run for up to 1 hour. It needs to be processed only once. We are getting issues where the message is becoming available to other processors however when we attempt to complete the message.
Here is the setup in service bus:
We setup the service bus in the following way:
var options = new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
PrefetchCount = 0,
ReceiveMode = ServiceBusReceiveMode.PeekLock,
MaxAutoLockRenewalDuration = TimeSpan.FromHours(2),
};
We then handle the message in the following way:
private async Task HandleMessageAsync(ProcessMessageEventArgs processMessageEventArgs)
{
try
{
var rawMessageBody = Encoding.UTF8.GetString(processMessageEventArgs.Message.Body);
_logger.LogInformation("Received message {MessageId} with body {MessageBody}",
processMessageEventArgs.Message.MessageId, rawMessageBody);
var repoRequest = JsonConvert.DeserializeObject<TMessage>(rawMessageBody);
if (repoRequest != null)
{
await ProcessMessage(repoRequest, processMessageEventArgs.Message.MessageId,
processMessageEventArgs.Message.ApplicationProperties,
processMessageEventArgs.CancellationToken);
}
else
{
_logger.LogError(
"Unable to deserialize to message contract {ContractName} for message {MessageBody}",
typeof(TMessage), rawMessageBody);
}
_logger.LogInformation("Message {MessageId} processed", processMessageEventArgs.Message.MessageId);
await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message);
}
catch (Exception ex)
{
await processMessageEventArgs.AbandonMessageAsync(processMessageEventArgs.Message);
_logger.LogError(ex, "Unable to handle message");
}
}
However we are getting constant ServiceBusReceiver.RenewMessageLock exceptions along the way which is fine as the message keeps processing. However at the end of the message when we manually call await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message);
it is failing with
Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. For more information please see https://aka.ms/ServiceBusExceptions . Reference:xxxxxxx, TrackingId:xxxxx, SystemTracker:gi::G9:4521499:amqps://xxxxxx.servicebus.windows.net/-eb5b7cf4;25:30:31:source(address:/xxxxxx,filter:[]), bi::in-connection1648(G9-96965)::session1654::link417153, Timestamp:2025-02-05T11:37:31 (MessageLockLost). For troubleshooting information, see https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot. at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.DisposeMessageAsync(Guid lockToken, Outcome outcome, DispositionStatus disposition, TimeSpan timeout, IDictionary
2 propertiesToModify, String deadLetterReason, String deadLetterDescription) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteInternalAsync(Guid lockToken, TimeSpan timeout) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c.<<CompleteAsync>b__47_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__22
1.<b__22_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func
4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteAsync(Guid lockToken, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)
What should we be doing differently. I know that even though MaxAutoLockRenewalDuration is set to 2 hours, Azure doesn't guarantee renewal under all conditions. But how should we be handling this?
Share Improve this question asked Feb 5 at 18:34 user1112324user1112324 6431 gold badge9 silver badges23 bronze badges 5 |1 Answer
Reset to default 0Auto-lock renewal is not reliable for long-running tasks. manually renew the lock.
As @SeanFeldman, @VladDX said,
Store the message in a database or blob storage.
Acknowledge the message immediately process it separately in an independent worker.
Instead of Auto-lock renewal session-based messages provides an exclusive lock that lasts as long as the session remains active.
Azure Service Bus locks the entire session as long as you keep the session open, the message won’t be available to other consumers.
Same processor continues handling the session’s messages until it’s closed.
Enable "session" while creating the queue in service bus.
Consume all messages in the queue using your existing regular queue processor before enabling sessions.
Once the queue is empty, enable sessions and start sending messages with
SessionId
.
Updated the Sender to Including SessionId
:
var message = new ServiceBusMessage(Encoding.UTF8.GetBytes("Your message body"))
{
SessionId = "MySession-" + Guid.NewGuid().ToString() // Each message belongs to a session
};
await sender.SendMessageAsync(message);
Use a Session-Based Processor:
var processor = client.CreateSessionProcessor(queueName, new ServiceBusSessionProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentSessions = 1, // Ensures only one processor at a time
MaxConcurrentCallsPerSession = 1, // Ensures messages in a session are processed sequentially
SessionIdleTimeout = TimeSpan.FromMinutes(5), // Releases session lock if idle
PrefetchCount = 0
});
// Message handler for session-based processing
processor.ProcessMessageAsync += async args =>
{
try
{
var messageBody = Encoding.UTF8.GetString(args.Message.Body);
_logger.LogInformation("Processing message {MessageId} in session {SessionId}", args.Message.MessageId, args.SessionId);
// Simulate long processing time (e.g., 1 hour)
await Task.Delay(TimeSpan.FromMinutes(60));
// Complete the message after processing
await args.CompleteMessageAsync(args.Message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message {MessageId}");
await args.AbandonMessageAsync(args.Message);
}
};
// Error handler
processor.ProcessErrorAsync += args =>
{
_logger.LogError(args.Exception, "Error in Service Bus session processor");
return Task.CompletedTask;
};
// Start the processor
await processor.StartProcessingAsync();
RenewMessageLockAsync
periodically while processing the message. If this is a rare issue use explicit lock renewal or if this happens all the time: Use a tracking database or the Inbox pattern as @SeanFeldman and @VladDX said. – Suresh Chikkam Commented Feb 6 at 4:20RenewMessageLockAsync
or an app does. When a lock is lost in a scenario like this, it generally is because the service state is lost, not that renewal was not attempted. – Jesse Squire Commented Feb 6 at 19:15