- When I enabled message Redelivery: I encountered an issue while consuming messages with a batch consumer, despite throwing an exception properly, the message still moved to the skip queue instead of the fault/error queue.
- But If I disabled Redelivery, messages moved to fault/error queue properly after configured retries I tried adjusting redelivery and retry configuration but still not success. For reference below is my sample snippet:
Startup Configuration
services.AddMassTransit(x =>
{
x.AddConsumer<NotificationConsumer>(c =>
{
if (_enableRedelivery)
{
c.UseScheduledRedelivery(r => r.Intervals(
TimeSpan.FromMinutes(1),
TimeSpan.FromMinutes(3),
TimeSpan.FromMinutes(5)
));
}
c.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(2)));
c.UseInMemoryOutbox();
c.UseConcurrencyLimit(1);
});
x.AddConsumer<NotificationFaultConsumer>();
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(
new Uri(Configuration.GetValue<string>("RabbitMq:ServerRootAddress")),
h =>
{
h.Username(Configuration.GetValue<string>("RabbitMq:UserName"));
h.Password(Configuration.GetValue<string>("RabbitMq:Password"));
h.ConfigureBatchPublish(batchPublishConfigurator =>
{
batchPublishConfigurator.Enabled = true;
batchPublishConfigurator.Timeout = TimeSpan.FromMilliseconds(5);
});
}
);
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
Consumer
public class NotificationConsumer : IConsumer<Batch<NotificationMessage>>
{
private readonly INotificationService _notificationService;
public NotificationConsumer(INotificationService notificationService)
{
_notificationService = notificationService;
}
public async Task Consume(ConsumeContext<Batch<NotificationMessage>> context)
{
try
{
await _notificationService.ProcessNotification(context.Message);
}
catch (Exception ex)
{
// Log Exception
throw;
}
}
}
Fault Consumer
public class NotificationFaultConsumer : IConsumer<Fault<Batch<NotificationMessage>>>
{
public NotificationFaultConsumer()
{
// Logger
}
public async Task Consume(ConsumeContext<Fault<Batch<NotificationMessage>>> context)
{
var sourceQueue = context.SourceAddress.Segments.Last();
try
{
// Logging
// Do some business
}
catch (Exception ex)
{
// Logging
}
}
}
MassTransit version
<PackageReference Include="MassTransit" Version="7.3.1" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.3.1" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.3.1" />
<PackageReference Include="MassTransit.Hangfire" Version="7.3.1" />
Is there anything I missing, I need both retry and redelivery and message should moved to fault/error queue.