I'm building a piece of software responsible for processing different types of events from my system. This component is hosted as an Azure Function App, triggered by an Azure Storage Queue. The challenge is that I can have many different event types enqueued into the same queue.
To handle this, I've created a base message record:
public abstract record BaseMessage
{
public virtual string MessageName { get; init; } = nameof(BaseMessage);
public Guid CorrelationId { get; init; }
public Guid ProcessId { get; init; }
public int DequeueCount { get; init; }
}
And specific records for different events, for example:
public record DataProcessingRequestedEvent : BaseMessage
{
public Guid ProcessDataId { get; init; }
public override string MessageName => $"{nameof(DataProcessingRequestedEvent)}";
}
Azure Function Implementation:
[Function("MessageBrokerFunction")]
public void Run([QueueTrigger("message-broker", Connection = "StorageQueue")] QueueMessage queueMessage)
{
var anyEventResult = AnyMessage.Create(queueMessage.MessageId, queueMessage.MessageText);
if (anyEventResult.IsFailed)
{
_logger.LogError($"Failed to parse message: {anyEventResult.Errors}");
return;
}
_messageBroker.Handle(anyEventResult.Value);
}
To remove dependency on queue-specific types, I convert QueueMessage into an AnyMessage class:
public class AnyMessage
{
public string MessageId { get; init; }
public JObject MessageBody { get; init; }
public string MessageName { get; init; }
public static Result<AnyMessage> Create(string messageId, string messageBody)
{
try
{
var parsedMessageBody = JObject.Parse(messageBody);
var messageName = parsedMessageBody.Properties()
.FirstOrDefault(p => string.Equals(p.Name, nameof(BaseMessage.MessageName), StringComparison.OrdinalIgnoreCase))
?.Value?.ToString();
if (string.IsNullOrEmpty(messageName))
{
return Result.Fail(new MissingMessageTypeError(messageBody));
}
return Result.Ok(new AnyMessage
{
MessageId = messageId,
MessageBody = parsedMessageBody,
MessageName = messageName
});
}
catch (Exception ex)
{
return Result.Fail(new ExceptionOccuredWhileParsingMessageError(messageBody, ex));
}
}
}
Event Handling and Dispatching:
public interface IEventHandler<T> where T : BaseMessage
{
Task Handle(T eventMessage, EventExecutionContext context);
}
public interface IEventDispatcher
{
Task<EventExecutionResult> Dispatch<T>(T message, EventExecutionContext context) where T : BaseMessage;
}
public class EventDispatcher : IEventDispatcher
{
private readonly ILogger<EventDispatcher> _logger;
private readonly IServiceProvider _serviceProvider;
public EventDispatcher(ILogger<EventDispatcher> logger, IServiceProvider serviceProvider)
{
_logger = logger;
_serviceProvider = serviceProvider;
}
public async Task<EventExecutionResult> Dispatch<T>(T message, EventExecutionContext context) where T : BaseMessage
{
try
{
var handlerType = typeof(IEventHandler<>).MakeGenericType(typeof(T));
var handler = _serviceProvider.GetService(handlerType);
var handleMethod = handler.GetType().GetMethod("Handle");
var task = (Task)handleMethod.Invoke(handler, new object[] { message, context });
await task.ConfigureAwait(false);
return new EventExecutionResult();
}
catch (Exception ex)
{
// TODO: Handle exceptions appropriately
_logger.LogError(ex, "Error dispatching event.");
return new EventExecutionResult();
}
}
}
The Core Issue: The challenge lies in determining where to convert the AnyMessage into a specific event type and then invoke the event dispatcher.
One approach is to introduce an additional layer, such as an IMessagePreProcessor for each MessageName. This would receive the AnyMessage, deserialize it into the specific event type, and then invoke the EventDispatcher.
However, I'm wondering if there is a better, more dynamic solution that:
Doesn't rely on compile-time types Can dynamically deserialize the message and invoke the appropriate event handler at runtime
Any suggestions or recommendations would be greatly appreciated!
I'm building a piece of software responsible for processing different types of events from my system. This component is hosted as an Azure Function App, triggered by an Azure Storage Queue. The challenge is that I can have many different event types enqueued into the same queue.
To handle this, I've created a base message record:
public abstract record BaseMessage
{
public virtual string MessageName { get; init; } = nameof(BaseMessage);
public Guid CorrelationId { get; init; }
public Guid ProcessId { get; init; }
public int DequeueCount { get; init; }
}
And specific records for different events, for example:
public record DataProcessingRequestedEvent : BaseMessage
{
public Guid ProcessDataId { get; init; }
public override string MessageName => $"{nameof(DataProcessingRequestedEvent)}";
}
Azure Function Implementation:
[Function("MessageBrokerFunction")]
public void Run([QueueTrigger("message-broker", Connection = "StorageQueue")] QueueMessage queueMessage)
{
var anyEventResult = AnyMessage.Create(queueMessage.MessageId, queueMessage.MessageText);
if (anyEventResult.IsFailed)
{
_logger.LogError($"Failed to parse message: {anyEventResult.Errors}");
return;
}
_messageBroker.Handle(anyEventResult.Value);
}
To remove dependency on queue-specific types, I convert QueueMessage into an AnyMessage class:
public class AnyMessage
{
public string MessageId { get; init; }
public JObject MessageBody { get; init; }
public string MessageName { get; init; }
public static Result<AnyMessage> Create(string messageId, string messageBody)
{
try
{
var parsedMessageBody = JObject.Parse(messageBody);
var messageName = parsedMessageBody.Properties()
.FirstOrDefault(p => string.Equals(p.Name, nameof(BaseMessage.MessageName), StringComparison.OrdinalIgnoreCase))
?.Value?.ToString();
if (string.IsNullOrEmpty(messageName))
{
return Result.Fail(new MissingMessageTypeError(messageBody));
}
return Result.Ok(new AnyMessage
{
MessageId = messageId,
MessageBody = parsedMessageBody,
MessageName = messageName
});
}
catch (Exception ex)
{
return Result.Fail(new ExceptionOccuredWhileParsingMessageError(messageBody, ex));
}
}
}
Event Handling and Dispatching:
public interface IEventHandler<T> where T : BaseMessage
{
Task Handle(T eventMessage, EventExecutionContext context);
}
public interface IEventDispatcher
{
Task<EventExecutionResult> Dispatch<T>(T message, EventExecutionContext context) where T : BaseMessage;
}
public class EventDispatcher : IEventDispatcher
{
private readonly ILogger<EventDispatcher> _logger;
private readonly IServiceProvider _serviceProvider;
public EventDispatcher(ILogger<EventDispatcher> logger, IServiceProvider serviceProvider)
{
_logger = logger;
_serviceProvider = serviceProvider;
}
public async Task<EventExecutionResult> Dispatch<T>(T message, EventExecutionContext context) where T : BaseMessage
{
try
{
var handlerType = typeof(IEventHandler<>).MakeGenericType(typeof(T));
var handler = _serviceProvider.GetService(handlerType);
var handleMethod = handler.GetType().GetMethod("Handle");
var task = (Task)handleMethod.Invoke(handler, new object[] { message, context });
await task.ConfigureAwait(false);
return new EventExecutionResult();
}
catch (Exception ex)
{
// TODO: Handle exceptions appropriately
_logger.LogError(ex, "Error dispatching event.");
return new EventExecutionResult();
}
}
}
The Core Issue: The challenge lies in determining where to convert the AnyMessage into a specific event type and then invoke the event dispatcher.
One approach is to introduce an additional layer, such as an IMessagePreProcessor for each MessageName. This would receive the AnyMessage, deserialize it into the specific event type, and then invoke the EventDispatcher.
However, I'm wondering if there is a better, more dynamic solution that:
Doesn't rely on compile-time types Can dynamically deserialize the message and invoke the appropriate event handler at runtime
Any suggestions or recommendations would be greatly appreciated!
Share Improve this question asked yesterday j.arapj.arap 1151 silver badge13 bronze badges1 Answer
Reset to default 1Doesn't rely on compile-time types Can dynamically deserialize the message and invoke the appropriate event handler at runtime
To do this becomes highly CPU intensive, depending on your chosen serialization protocol. Xml is suited for this as it contains schema information embedded or referenced from the file itself, if you are using JSON then you will have to parse the file with a JSON reader or include scheama information in the JSON file itself.
- Especially with JSON, deserializing into an untyped structure is intensive, deserializing into a generic type that contains all possible fields is also CPU intensive but as a code model is easy to lose data if the schema is not updated as the code evolves.
What are you trying to achieve? Do you want the code to be more beautiful or more theoretically best practise? Are you trying to save CPU cycles? Are you trying to reduce costs? Are you trying to achieve more efficient overall throughput?
To remove dependency on queue-specific types
Why would you want to do this? Why do you want 1 queue with a common or base message implementation at all? It sounds like a good idea from an OO point of view, but you are deliberately creating a bottleneck in the processing pipeline and are losing efficiencies that can be gained from the infrastructure itself.
There is a lot of power in queue specific types, it means you can optimise the processors for each queue to specifically the dependencies that they need. It is the first step towards a proper distributed processing solution and a good step towards a microservice implementation. It means you can more easily upgrade individual queues and message types to different architecture and or runtimes.
One approach is to introduce an additional layer, such as an IMessagePreProcessor for each MessageName. This would receive the AnyMessage, deserialize it into the specific event type, and then invoke the EventDispatcher.
If you do this, you are effectively creating a custom software solution around creating separate infrastructure queues. It is counter-intuitive to think of it like this, what you are actually doing is more tightly coupled, all of your processing is likely to be highly dependent on the rest of your solution. This is a very monolithic way to solve a very common problem.
I'm wondering if there is a better, more dynamic solution
This is the monolith talking, what you should be striving for is a more declarative solution as it gives you more flexibility at scale.
In general I suggest conversion as close to the edge as is practical, then as your solution matures and starts generating more messages you can more easily refactor out certain message types to other infrastructure including enterprise grade service bus and or event hub/grid implementations.
Especially if different message types have different latencies in how they are processed, I always suggest different queues as a first step. On ingestion, based on the type of the message, send the message to it's own queue. This will be disruptive if your system design requires the messages to be processed sequentially across the whole system, but if you can accept eventual consistency then overall throughput and debugging will be simpler if the handlers are bound to their own queues, it means individual queues can fail and have poison or dead-lettering configured, it also means they can spin up with less cold start latency if you any need to include the dependencies that they need to process that one type of message.
A common pattern for high volume processing is to use an Event Hub to ingest all of the messages and have a process to validate, transform and re-route messages to specific queues. This could be done using no-code configuration, stream analytics or a Function dedicated to re-routing the messages.
- I am not saying start with Event Hubs today, but by moving to different queues first makes it easy to transition to a high volume event ingestion service like Event Hubs when you need to.
- While initially you might host all processing functions within the same project, it is very easy to move individual functions out to their own project and deployment target and still be processing from the same queue.