Reactor and Proactor: Concurrency Models
Brighter V10 introduces clearer terminology for its concurrency models: Reactor and Proactor.
Overview
When consuming messages from external message brokers, Brighter uses a Performer (the message pump) to retrieve messages and dispatch them to handlers. The Performer is single-threaded in both concurrency models, but the way it handles I/O differs significantly.
Reactor Pattern
The Reactor pattern uses blocking I/O with synchronous operations. When the Performer makes a synchronous call (e.g., to retrieve a message), it blocks the thread until the operation completes. This also blocks the message pump. Whilst this might seem negative, blocking the pump ensures that messages will be processed sequentially on a single thread.
With a Reactor you handlers, mappers and middleware should be sync.
Characteristics:
Blocking I/O operations
No context switching during I/O
Faster performance per operation (lower latency)
Thread remains busy during I/O
Uses
MessagePumpType.Reactor
When to use Reactor:
When minimizing latency is critical
When throughput requirements are moderate
When you have sufficient threads available
For traditional synchronous workloads
Proactor Pattern
The Proactor pattern uses non-blocking I/O with asynchronous operations. When the Performer makes an asynchronous I/O call, it yields the thread during I/O operations. Brighter has it's own synchronization context, and work will resume on Brighter's thread when the I/O completes, unless you use ConfigureAwait(false) to force it onto a thread pool thread.
With a Proactor your handlers, mappers and middleware should be async.
Characteristics:
Non-blocking I/O operations
Thread yielded during I/O (context switch occurs)
Better throughput (more operations per thread)
Higher per-operation latency due to context switching
Uses
MessagePumpType.Proactor
When to use Proactor:
When maximizing throughput is critical
In container environments with limited threads
With competing consumers pattern
When processing high volumes of messages
When I/O wait time is significant
Performance vs. Throughput Trade-offs
Latency
Lower (no context switch)
Higher (context switch overhead)
Throughput
Lower (thread stays busy)
Higher (thread yielded during I/O)
Thread Efficiency
One thread per pump
Multiple pumps can share threads
Resource Usage
More threads needed
Fewer threads needed
Best For
Low latency, moderate volume
High volume, container environments
Handler and Mapper Requirements
Critical: Your choice of Reactor or Proactor determines which handler and mapper types you must use. Mixing sync and async implementations will cause runtime errors.
Reactor Pattern Requirements
When using MessagePumpType.Reactor, you must use synchronous implementations:
Handlers
Implement IHandleRequests<T> (not IHandleRequestsAsync<T>):
public class MyCommandHandler : RequestHandler<MyCommand>
{
public override MyCommand Handle(MyCommand command)
{
// Synchronous business logic
_repository.Save(command.Data);
return base.Handle(command);
}
}Message Mappers
Use synchronous MapToMessage and MapToRequest methods:
public class MyCommandMessageMapper : IAmAMessageMapper<MyCommand>
{
public Message MapToMessage(MyCommand request, Publication publication)
{
var header = new MessageHeader(
messageId: request.Id,
topic: publication.Topic,
messageType: MessageType.MT_COMMAND
);
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public MyCommand MapToRequest(Message message)
{
return JsonSerializer.Deserialize<MyCommand>(message.Body.Value);
}
}Middleware/Attributes
Use synchronous handler attributes and middleware:
public class MyCommandHandler : RequestHandler<MyCommand>
{
[UsePolicy("RetryPolicy", step: 1)] // Synchronous policy
[RequestLogging(step: 0, timing: HandlerTiming.Before)] // Synchronous logging
public override MyCommand Handle(MyCommand command)
{
// Handler logic
return base.Handle(command);
}
}Proactor Pattern Requirements
When using MessagePumpType.Proactor, you must use asynchronous implementations:
Handlers
Implement IHandleRequestsAsync<T> (not IHandleRequests<T>):
public class MyCommandHandlerAsync : RequestHandlerAsync<MyCommand>
{
public override async Task<MyCommand> HandleAsync(
MyCommand command,
CancellationToken cancellationToken = default)
{
// Asynchronous business logic
await _repository.SaveAsync(command.Data, cancellationToken);
return await base.HandleAsync(command, cancellationToken);
}
}Message Mappers
Message mappers remain synchronous (they don't perform I/O), but the mapper is called from an async context:
public class MyCommandMessageMapper : IAmAMessageMapper<MyCommand>
{
// Same synchronous implementation as Reactor
public Message MapToMessage(MyCommand request, Publication publication)
{
var header = new MessageHeader(
messageId: request.Id,
topic: publication.Topic,
messageType: MessageType.MT_COMMAND
);
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public MyCommand MapToRequest(Message message)
{
return JsonSerializer.Deserialize<MyCommand>(message.Body.Value);
}
}Note: Message mappers don't have async variants because they typically don't perform I/O operations—they just transform data structures. If your mapper needs to perform async I/O (e.g., reading from a claim check store), use a custom mapper with synchronous wrapper methods that call Task.Run() or similar.
Middleware/Attributes
Use asynchronous handler attributes and middleware:
public class MyCommandHandlerAsync : RequestHandlerAsync<MyCommand>
{
[UseResiliencePipeline("RetryPipeline", step: 1)] // Async resilience pipeline
[RequestLoggingAsync(step: 0, timing: HandlerTiming.Before)] // Async logging
public override async Task<MyCommand> HandleAsync(
MyCommand command,
CancellationToken cancellationToken = default)
{
// Handler logic
return await base.HandleAsync(command, cancellationToken);
}
}What Happens If You Mix Them?
Mixing sync and async implementations causes runtime errors:
MessagePumpType.Reactor
IHandleRequests<T>
Works correctly
MessagePumpType.Reactor
IHandleRequestsAsync<T>
Runtime error: Reactor pump cannot dispatch to async handlers
MessagePumpType.Proactor
IHandleRequestsAsync<T>
Works correctly
MessagePumpType.Proactor
IHandleRequests<T>
Runtime error: Proactor pump cannot dispatch to sync handlers
Best Practice: Be consistent throughout your entire pipeline:
Reactor → All synchronous handlers, middleware, and policies
Proactor → All asynchronous handlers, middleware, and policies
Checking Your Implementation
To verify your handlers match your pump type:
// Reactor - Check for IHandleRequests<T>
public class MyHandler : RequestHandler<MyCommand> // Correct base class
{
public override MyCommand Handle(MyCommand command) // Sync method
{
return base.Handle(command);
}
}
// Proactor - Check for IHandleRequestsAsync<T>
public class MyHandlerAsync : RequestHandlerAsync<MyCommand> // Correct base class
{
public override async Task<MyCommand> HandleAsync( // Async method
MyCommand command,
CancellationToken cancellationToken = default)
{
return await base.HandleAsync(command, cancellationToken);
}
}Configuration
Reactor Configuration
Configure a subscription to use the Reactor pattern with MessagePumpType.Reactor:
var subscription = new Subscription<MyCommand>(
new SubscriptionName("my.subscription"),
channelName: new ChannelName("my.channel"),
routingKey: new RoutingKey("my.routing.key"),
messagePumpType: MessagePumpType.Reactor, // Blocking I/O
timeOut: TimeSpan.FromMilliseconds(200),
makeChannels: OnMissingChannel.Create
);Proactor Configuration
Configure a subscription to use the Proactor pattern with MessagePumpType.Proactor:
var subscription = new Subscription<MyCommand>(
new SubscriptionName("my.subscription"),
channelName: new ChannelName("my.channel"),
routingKey: new RoutingKey("my.routing.key"),
messagePumpType: MessagePumpType.Proactor, // Non-blocking I/O
timeOut: TimeSpan.FromMilliseconds(200),
makeChannels: OnMissingChannel.Create
);Transport Native Support
Different transports have varying levels of native support for synchronous and asynchronous operations. When a transport doesn't natively support an operation mode, Brighter adapts it:
Azure Service Bus
Sync over Async
Native
Async API is native
AWS (SNS/SQS)
Sync over Async
Native
Async API is native
Kafka
Native
Async over Sync
Sync API is native
MQTT
Sync over Async/Event Based
Event Based
Event-driven architecture
MSSQL
Native
Native
Both APIs supported
PostgreSQL
Native
Native
Both APIs supported
RabbitMQ v6
Sync over Async
Native
Async client recommended
RabbitMQ v7
Sync over Async
Native
Async client recommended
Redis
Native
Native
Both APIs supported
Key:
Native: Transport SDK provides native support
Sync over Async: Uses
Task.Run()or similar to wrap async in syncAsync over Sync: Uses
Task.FromResult()or similar to wrap sync in asyncEvent Based: Uses event-driven callbacks
The Performer (Message Pump)
The Performer is an instance of Brighter's MessagePump implementation that retrieves messages and dispatches them to handlers. Important characteristics:
Single-threaded by default in both Reactor and Proactor modes
Runs one message at a time through the handler pipeline
Provides a synchronization context for handler execution
Manages the message processing lifecycle
ConfigureAwait(false) Warning
If you use ConfigureAwait(false) in your handler code when using the Proactor pattern, you will bypass Brighter's synchronization context and resume on a thread pool thread:
public class MyCommandHandlerAsync : RequestHandlerAsync<MyCommand>
{
public override async Task<MyCommand> HandleAsync(MyCommand command, CancellationToken cancellationToken = default)
{
// This yields to the thread pool, ignoring the synchronization context
await SomeOperationAsync().ConfigureAwait(false);
return await base.HandleAsync(command, cancellationToken);
}
}Important: Using ConfigureAwait(false) means:
Your handler continues on a thread pool thread (not the Performer's thread)
You lose the single-threaded guarantee
Multiple handlers could execute concurrently if you have multiple Performers
For more details on ConfigureAwait, see the official .NET documentation.
Migration from V9 to V10
In Brighter V9, concurrency was configured using isAsync and runAsync flags. In V10, this has been simplified to the MessagePumpType enum.
V9 Configuration (Deprecated)
// V9 - DEPRECATED
var subscription = new Subscription<MyCommand>(
// ... other parameters
isAsync: true, // Was this an async handler?
runAsync: true // Should we use async message pump?
);V10 Configuration (Current)
// V10 - Use MessagePumpType
var subscription = new Subscription<MyCommand>(
// ... other parameters
messagePumpType: MessagePumpType.Proactor // Clear and explicit
);Migration Steps
Remove
isAsyncandrunAsyncparameters from your subscriptionsAdd
messagePumpTypeparameter with appropriate value:Use
MessagePumpType.Reactorif you hadisAsync: false, runAsync: false(or both true but blocking code)Use
MessagePumpType.Proactorif you hadisAsync: true, runAsync: truewith async handlers
Update handler implementations to match:
MessagePumpType.Reactor→ UseIHandleRequests<T>with synchronousHandle()methodMessagePumpType.Proactor→ UseIHandleRequestsAsync<T>with asyncHandleAsync()method
Choosing Between Reactor and Proactor
Use this decision guide to choose the right pattern:
Choose Reactor When:
You need the lowest possible latency per message
Message volume is moderate
Your handlers perform synchronous operations
You're working with transports that have native sync APIs (Kafka, MSSQL, Redis)
Choose Proactor When:
You need to maximize throughput
Your handlers perform significant I/O operations
You're working with transports that have native async APIs (AWS, Azure, RabbitMQ)
Example Scenarios
Low-Latency Trading System (Reactor):
// Need fastest possible response time per trade
var subscription = new Subscription<TradeCommand>(
new SubscriptionName("trade.processor"),
channelName: new ChannelName("trades"),
routingKey: new RoutingKey("trade.execute"),
messagePumpType: MessagePumpType.Reactor, // Minimize latency
timeOut: TimeSpan.FromMilliseconds(100)
);High-Volume Order Processing (Proactor):
// Need to process thousands of orders per second
var subscription = new Subscription<OrderCommand>(
new SubscriptionName("order.processor"),
channelName: new ChannelName("orders"),
routingKey: new RoutingKey("order.created"),
messagePumpType: MessagePumpType.Proactor, // Maximize throughput
timeOut: TimeSpan.FromMilliseconds(500)
);Best Practices
CRITICAL: Match your handler implementation to your MessagePumpType:
Reactor → Synchronous handlers (
IHandleRequests<T>)Proactor → Asynchronous handlers (
IHandleRequestsAsync<T>)Mixing these will cause runtime errors
See Handler and Mapper Requirements for details
Consider your transport's native support:
Prefer Proactor for AWS, Azure, RabbitMQ (native async)
Consider Reactor for Kafka (native sync)
Profile your application:
Measure actual latency and throughput
Test both patterns under load
Choose based on your specific requirements
Be cautious with ConfigureAwait(false):
Understand you're breaking the single-threaded guarantee
Use only when you need thread pool execution
Test thoroughly for race conditions
Start simple:
Begin with Proactor (it's generally more flexible)
Switch to Reactor only if latency measurements require it
Don't optimize prematurely
Related Documentation
How the Dispatcher Works - Dispatcher internals
Configuring the Dispatcher - Dispatcher configuration
Subscriptions - Subscription configuration
Message Pumps - Message pump implementation details
Performance Tuning - Optimization strategies
Summary
Reactor = Blocking I/O = Lower latency per operation = Moderate throughput = Requires synchronous handlers
Proactor = Non-blocking I/O = Higher latency per operation = Better throughput = Requires asynchronous handlers
Performer = Single-threaded message pump in both patterns
Handler Types MUST Match: Reactor uses
IHandleRequests<T>, Proactor usesIHandleRequestsAsync<T>ConfigureAwait(false) = Breaks single-threaded guarantee
Choose based on your requirements: Latency-sensitive? → Reactor. Throughput-sensitive? → Proactor
When in doubt: Start with Proactor (it's more flexible for most use cases)
Last updated
Was this helpful?
