I use masstransit with kafka. I thought masstransit would handle my exception in consumer and not acknowledge message.
Сonsumer saves messages in the inbox, if the database is unavailable, an exception is thrown. How not to acknowledge the message and don't change the offset?
public class ConfirmationTokenCreatedConsumer(IInbox inbox) : IConsumer<ConfirmationTokenCreated>
{
public async Task Consume(ConsumeContext<ConfirmationTokenCreated> context)
{
var @event = context.Message;
var inputConsumerEvent = new ConfirmationTokenCreatedConsumerEvent(
@event.EventId,
@event.EmailAddress,
@event.ConfirmationUrl);
await inbox.Save(inputConsumerEvent);
}
}
Method «Save» can throw NpgsqlException if database is unavailable:
...
await using var transaction = await context.Database.BeginTransactionAsync();
try
{
await context.Inbox.AddAsync(inboxEvent);
await context.SaveChangesAsync();
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}