Dynamic Message Deserialization
Overview
Brighter supports dynamic type resolution, allowing you to route multiple message types through a single channel. Instead of determining the message type at compile-time through generic parameters, you can use content-based routing where the message type is determined at runtime from metadata.
This enables more flexible messaging patterns while maintaining type safety once the message type is resolved.
DataType Channel Pattern (Default)
By default, Brighter uses the DataType Channel pattern from Enterprise Integration Patterns. In this pattern, each channel carries messages of a single, known type.
DataType Channel Example
// DataType Channel - One type per subscription
var subscription = new KafkaSubscription<TaskCreated>(
new SubscriptionName("paramore.example.tasks"),
channelName: new ChannelName("task.created"),
routingKey: new RoutingKey("task.created"),
groupId: "task-processor",
timeOut: TimeSpan.FromMilliseconds(100)
);Characteristics:
Simple and straightforward
Type-safe at compile-time
One handler per channel
Recommended for most scenarios
Requires separate channel per message type
Cannot handle message type evolution on same channel
When to use DataType Channel:
You have distinct topics/queues for each message type
Message types are stable and don't evolve frequently
You want compile-time type safety
Simple producer-consumer patterns
This is Brighter's default and recommended approach for most scenarios.
Dynamic Message Deserialization
Dynamic message deserialization allows multiple message types on the same channel by resolving the type at runtime based on message metadata.
When to Use Dynamic Deserialization
Dynamic deserialization is useful when:
Multiple related message types share a single topic/queue
Message type evolution - new message types added to existing channels
CloudEvents-based routing - using the CloudEvents
typeattributeContent-based routing - routing decisions based on message content
Shared infrastructure - multiple teams publishing to common topics
How It Works
Instead of specifying the type via a generic parameter, you provide a getRequestType callback in your Subscription that examines the message and returns the appropriate type:
var subscription = new KafkaSubscription(
new SubscriptionName("paramore.example.tasks"),
channelName: new ChannelName("task.state"),
routingKey: new RoutingKey("task.update"),
getRequestType: message => /* return type based on message */,
groupId: "task-processor",
timeOut: TimeSpan.FromMilliseconds(100)
);Using CloudEvents Type for Routing
The most common approach for dynamic deserialization is using the CloudEvents type attribute. This provides a standard, interoperable way to identify message types.
CloudEvents Type Routing Example
var subscription = new KafkaSubscription(
new SubscriptionName("paramore.example.taskstate"),
channelName: new ChannelName("task.state"),
routingKey: new RoutingKey("task.update"),
getRequestType: message => message.Header.Type switch
{
var t when t == new CloudEventsType("io.goparamore.task.created")
=> typeof(TaskCreated),
var t when t == new CloudEventsType("io.goparamore.task.updated")
=> typeof(TaskUpdated),
var t when t == new CloudEventsType("io.goparamore.task.completed")
=> typeof(TaskCompleted),
_ => throw new ArgumentException(
$"No type mapping found for message with CloudEvents type {message.Header.Type}",
nameof(message)
)
},
groupId: "kafka-TaskProcessor-Sample",
timeOut: TimeSpan.FromMilliseconds(100),
offsetDefault: AutoOffsetReset.Earliest,
commitBatchSize: 5,
sweepUncommittedOffsetsInterval: TimeSpan.FromMilliseconds(10000),
messagePumpType: MessagePumpType.Proactor
);How it works:
Message arrives on
task.updatechannelBrighter populates
message.Header.Type(CloudEvents type attribute)Callback matches CloudEvents type to Request type
Brighter deserializes message to correct Request type
Routes to appropriate handler based on type
Setting CloudEvents Type on Publication
On the producer side, set the CloudEvents type in your Publication:
var publications = new[]
{
new Publication
{
Topic = new RoutingKey("task.update"),
RequestType = typeof(TaskCreated),
Source = new Uri("https://example.com/tasks"),
Type = new CloudEventsType("io.goparamore.task.created")
},
new Publication
{
Topic = new RoutingKey("task.update"),
RequestType = typeof(TaskUpdated),
Source = new Uri("https://example.com/tasks"),
Type = new CloudEventsType("io.goparamore.task.updated")
},
new Publication
{
Topic = new RoutingKey("task.update"),
RequestType = typeof(TaskCompleted),
Source = new Uri("https://example.com/tasks"),
Type = new CloudEventsType("io.goparamore.task.completed")
}
};
services.AddBrighter(options => { })
.AddProducers(configure =>
{
configure.ProducerRegistry = new KafkaProducerRegistryFactory(
new KafkaMessagingGatewayConfiguration { /* ... */ },
publications
).Create();
});All three message types go to the same task.update topic, distinguished by their CloudEvents type.
Custom Routing Strategies
While CloudEvents type is recommended, you can implement any routing strategy by examining message properties.
Routing by Custom Header
var subscription = new RmqSubscription(
new SubscriptionName("paramore.example.orders"),
channelName: new ChannelName("orders"),
routingKey: new RoutingKey("orders"),
getRequestType: message =>
{
// Route based on custom header
if (message.Header.Bag.TryGetValue("OrderType", out var orderType))
{
return orderType switch
{
"Create" => typeof(CreateOrder),
"Update" => typeof(UpdateOrder),
"Cancel" => typeof(CancelOrder),
_ => throw new ArgumentException($"Unknown order type: {orderType}")
};
}
throw new ArgumentException("OrderType header not found");
},
timeOut: TimeSpan.FromMilliseconds(100)
);Routing by Message Body Content
var subscription = new AzureServiceBusSubscription(
new SubscriptionName("paramore.example.events"),
channelName: new ChannelName("events"),
routingKey: new RoutingKey("events"),
getRequestType: message =>
{
// Parse JSON to determine type
using var doc = JsonDocument.Parse(message.Body.Value);
var root = doc.RootElement;
if (root.TryGetProperty("eventType", out var eventType))
{
return eventType.GetString() switch
{
"UserCreated" => typeof(UserCreated),
"UserUpdated" => typeof(UserUpdated),
"UserDeleted" => typeof(UserDeleted),
_ => throw new ArgumentException($"Unknown event type: {eventType}")
};
}
throw new ArgumentException("eventType property not found in message body");
},
timeOut: TimeSpan.FromMilliseconds(100)
);Note: Parsing the body for routing is less efficient than using headers, but can be useful when integrating with systems that don't support custom headers.
Handler Routing
Once the request type is resolved, Brighter routes the message to the appropriate handler using its standard handler resolution:
Standard 1-to-1 Handler Mapping
services.AddBrighter(options => { })
.AddConsumers(options =>
{
options.Subscriptions = new[] { subscription };
})
.Handlers(registry =>
{
// Each message type routes to its handler
registry.Register<TaskCreated, TaskCreatedHandler>();
registry.Register<TaskUpdated, TaskUpdatedHandler>();
registry.Register<TaskCompleted, TaskCompletedHandler>();
});With dynamic deserialization:
Message arrives on
task.updatechannelgetRequestTypecallback returnstypeof(TaskCreated)Brighter deserializes to
TaskCreatedRoutes to
TaskCreatedHandler
Integration with Agreement Dispatcher
Dynamic message deserialization can be combined with Agreement Dispatcher for even more flexible routing:
// First: Resolve message type dynamically
var subscription = new KafkaSubscription(
new SubscriptionName("paramore.example.orders"),
channelName: new ChannelName("orders"),
routingKey: new RoutingKey("orders"),
getRequestType: message => message.Header.Type switch
{
var t when t == new CloudEventsType("com.example.order.created")
=> typeof(OrderCreated),
_ => throw new ArgumentException($"Unknown type: {message.Header.Type}")
},
// ... other config
);
// Second: Dynamically choose handler based on content
services.AddBrighter(options => { })
.AddConsumers(options => { options.Subscriptions = new[] { subscription }; })
.Handlers(registry =>
{
registry.Register<OrderCreated>((request, context) =>
{
var order = request as OrderCreated;
// Route to different handlers based on order properties
if (order?.Country == "US")
return [typeof(USOrderCreatedHandler)];
if (order?.Country == "UK")
return [typeof(UKOrderCreatedHandler)];
return [typeof(DefaultOrderCreatedHandler)];
},
[
typeof(USOrderCreatedHandler),
typeof(UKOrderCreatedHandler),
typeof(DefaultOrderCreatedHandler)
]);
});This provides two levels of routing:
Dynamic deserialization: CloudEvents type →
OrderCreatedAgreement dispatcher: Order content → Country-specific handler
Performance Considerations
Dynamic message deserialization has a small performance overhead compared to DataType Channel:
Runtime Type Resolution
DataType Channel (Compile-Time):
Type known at compile-time via generic parameter
Message mapper pipeline pre-built
Minimal runtime overhead
Dynamic Deserialization (Runtime):
Type determined by executing callback function
Message mapper pipeline built on first use per type
Pipeline cached for subsequent messages of same type
Configuration Examples
Kafka with CloudEvents Routing
var subscription = new KafkaSubscription(
new SubscriptionName("paramore.example.inventory"),
channelName: new ChannelName("inventory.events"),
routingKey: new RoutingKey("inventory"),
getRequestType: message => message.Header.Type switch
{
var t when t == new CloudEventsType("com.example.inventory.itemadded")
=> typeof(ItemAdded),
var t when t == new CloudEventsType("com.example.inventory.itemremoved")
=> typeof(ItemRemoved),
var t when t == new CloudEventsType("com.example.inventory.stockadjusted")
=> typeof(StockAdjusted),
_ => throw new ArgumentException(
$"Unmapped CloudEvents type: {message.Header.Type}",
nameof(message)
)
},
groupId: "inventory-processor",
timeOut: TimeSpan.FromMilliseconds(100),
offsetDefault: AutoOffsetReset.Earliest,
messagePumpType: MessagePumpType.Proactor
);
services.AddBrighter(options => { })
.AddConsumers(options =>
{
options.Subscriptions = new[] { subscription };
})
.Handlers(registry =>
{
registry.Register<ItemAdded, ItemAddedHandler>();
registry.Register<ItemRemoved, ItemRemovedHandler>();
registry.Register<StockAdjusted, StockAdjustedHandler>();
})
.AutoFromAssemblies([typeof(ItemAdded).Assembly]);RabbitMQ with CloudEvents Routing
var subscription = new RmqSubscription(
new SubscriptionName("paramore.example.notifications"),
channelName: new ChannelName("notifications"),
routingKey: new RoutingKey("notifications.#"), // Wildcard routing
getRequestType: message => message.Header.Type switch
{
var t when t == new CloudEventsType("com.example.email.sent")
=> typeof(EmailSent),
var t when t == new CloudEventsType("com.example.sms.sent")
=> typeof(SmsSent),
var t when t == new CloudEventsType("com.example.push.sent")
=> typeof(PushNotificationSent),
_ => throw new ArgumentException(
$"Unknown notification type: {message.Header.Type}",
nameof(message)
)
},
timeOut: TimeSpan.FromMilliseconds(100),
messagePumpType: MessagePumpType.Proactor
);AWS SQS with CloudEvents Routing
var subscription = new SqsSubscription(
new SubscriptionName("paramore.example.orders"),
channelName: new ChannelName("orders"),
routingKey: new RoutingKey("orders"),
getRequestType: message => message.Header.Type switch
{
var t when t == new CloudEventsType("com.example.order.placed")
=> typeof(OrderPlaced),
var t when t == new CloudEventsType("com.example.order.shipped")
=> typeof(OrderShipped),
var t when t == new CloudEventsType("com.example.order.delivered")
=> typeof(OrderDelivered),
_ => throw new ArgumentException(
$"Unrecognized order event: {message.Header.Type}",
nameof(message)
)
},
bufferSize: 10,
timeOut: TimeSpan.FromMilliseconds(100),
lockTimeout: TimeSpan.FromSeconds(30)
);Best Practices
1. Use CloudEvents Type for Routing
CloudEvents provides a standard, interoperable way to identify message types:
// Good - Standard CloudEvents type
getRequestType: message => message.Header.Type switch
{
var t when t == new CloudEventsType("com.example.order.created")
=> typeof(OrderCreated),
// ...
}
// Bad - Custom header parsing
getRequestType: message =>
{
var type = message.Header.Bag["MessageType"];
// Non-standard approach
}2. Provide Comprehensive Type Mappings
Handle all expected message types and provide a clear error for unmapped types:
// Good - Clear error message
getRequestType: message => message.Header.Type switch
{
var t when t == new CloudEventsType("com.example.task.created")
=> typeof(TaskCreated),
var t when t == new CloudEventsType("com.example.task.updated")
=> typeof(TaskUpdated),
_ => throw new ArgumentException(
$"No type mapping found for CloudEvents type '{message.Header.Type}'. " +
$"Supported types: com.example.task.created, com.example.task.updated",
nameof(message)
)
}
// Bad - Generic error
getRequestType: message => message.Header.Type switch
{
var t when t == new CloudEventsType("com.example.task.created")
=> typeof(TaskCreated),
_ => throw new Exception("Unknown message type")
}3. Use Meaningful CloudEvents Types
Follow reverse-DNS naming for CloudEvents types:
// Good - Reverse DNS, hierarchical
new CloudEventsType("io.goparamore.task.created")
new CloudEventsType("com.example.inventory.item.added")
// Bad - Too generic
new CloudEventsType("TaskCreated")
// Bad - Not following conventions
new CloudEventsType("CREATE_TASK")See CloudEvents Support for more on CloudEvents type naming.
4. Consider DataType Channel First
Start with DataType Channel (one type per topic) unless you have a specific need for dynamic deserialization:
// Good - Simple DataType Channel when possible
var subscription = new KafkaSubscription<TaskCreated>(
// ...
);Only use dynamic when needed:
Multiple related types on same channel
Message evolution scenarios
CloudEvents-based integration
5. Cache Performance-Critical Paths
If performance is critical, pre-warm the pipeline cache:
// Send one message of each type at startup to warm caches
await _commandProcessor.PublishAsync(new TaskCreated { /* ... */ });
await _commandProcessor.PublishAsync(new TaskUpdated { /* ... */ });
await _commandProcessor.PublishAsync(new TaskCompleted { /* ... */ });
// Subsequent messages will use cached pipelines6. Document Type Mappings
Document which CloudEvents types map to which Request types:
/// <summary>
/// Subscription for task state changes.
/// Supports the following CloudEvents types:
/// - io.goparamore.task.created → TaskCreated
/// - io.goparamore.task.updated → TaskUpdated
/// - io.goparamore.task.completed → TaskCompleted
/// </summary>
var subscription = new KafkaSubscription(
// ...
getRequestType: message => message.Header.Type switch
{
// ...
}
);Comparison: DataType Channel vs Dynamic Deserialization
Type Resolution
Compile-time (generic)
Runtime (callback)
Performance
Fastest
Fast
Flexibility
One type per channel
Multiple types per channel
Type Safety
Compile-time
Runtime (after resolution)
Setup Complexity
Simple
Moderate
Message Evolution
Requires new channels
Same channel
CloudEvents Integration
Not needed
Natural fit
When to Use
Default, most scenarios
Multiple types, evolution
Error Handling
Handle unmapped message types gracefully:
var subscription = new KafkaSubscription(
new SubscriptionName("paramore.example.tasks"),
channelName: new ChannelName("task.events"),
routingKey: new RoutingKey("task.events"),
getRequestType: message =>
{
try
{
return message.Header.Type switch
{
var t when t == new CloudEventsType("io.goparamore.task.created")
=> typeof(TaskCreated),
var t when t == new CloudEventsType("io.goparamore.task.updated")
=> typeof(TaskUpdated),
_ => throw new ArgumentException(
$"Unmapped CloudEvents type: {message.Header.Type}. " +
$"Message ID: {message.Id}",
nameof(message)
)
};
}
catch (Exception ex)
{
// Log the error with full message context
_logger.LogError(ex,
"Failed to resolve message type. MessageId: {MessageId}, Type: {Type}",
message.Id,
message.Header.Type);
throw;
}
},
// ... other config
);Failed messages will go to the dead letter queue based on your failure handling configuration.
Further Reading
Cloud Events Support - Understanding CloudEvents in Brighter
Agreement Dispatcher - Dynamic handler selection
Default Message Mappers - Automatic message mapping
Routing - Message routing in Brighter
Sample Code
Full working examples can be found in the Brighter samples:
Dynamic Deserialization:
Brighter/samples/TaskQueue/- Examples using CloudEvents type routingMulti-type Channels:
Brighter/samples/MultiBus/- Multiple message types on shared infrastructure
Last updated
Was this helpful?
