InMemory Options for Development and Testing
Overview
Brighter V10 provides a comprehensive suite of in-memory implementations for key components, making it easy to develop and test applications without external dependencies. These in-memory options replace databases, message brokers, and schedulers with simple, lightweight alternatives that run entirely in process.
Key Benefits:
Zero dependencies: No databases, message brokers, or external services required
Fast execution: Perfect for unit and integration tests
Simple configuration: Minimal setup, get started immediately
Consistent APIs: Same interfaces as production components
Deterministic behavior: Predictable, repeatable test execution
Important: InMemory options are designed for development and testing. While robust, they are generally not recommended for production due to lack of persistence, distribution, and durability guarantees.
Available InMemory Components
Brighter V10 provides InMemory implementations for the following components:
InMemory Transport
The InMemory Transport provides lightweight message publishing and consumption without requiring a message broker like RabbitMQ, Kafka, or AWS SQS. It consists of three replacements:
InternalBus An in memory collection of topics, and queues of messages to those topics. It implements
IAmABusand can be used from theInMemoryMessageProducerandInMemoryMessageConsumerto exchange a message.InMemoryMessageProducer An implementation of
IAmAMessageProducerSync,IAmAMessageProducerAsyncandIAmABulkMessageProducerAsyncthat produces message to topics on theInternalBus.InMemoryMessageConsumer An implementation of
IAmAMessageConsumerSyncandIAmAMessageConsumerAsyncthat consumes messages from topics on theInternalBus.
When to Use
Perfect for:
Unit testing command and event handlers
Integration testing without external dependencies
Local development and debugging
Demos and proof-of-concepts
Production Use Cases (limited):
Single-process applications with no distribution requirements
Internal message passing within a monolith
Scenarios where message loss is acceptable
Configuration
Internal Bus:
var internalBus = new InternalBus();Producer Configuration:
using Paramore.Brighter;
using Paramore.Brighter.Extensions.DependencyInjection;
var internalBus = new InternalBus();
services.AddBrighter(options =>
{
options.HandlerLifetime = ServiceLifetime.Scoped;
})
.AddProducers(options =>
{
var publication = new Publication() { Topic = new RoutingKey("Topic") };
options.ProducerRegistry = new InMemoryProducerRegistryFactory(internalBus , new[] { publication }, InstrumentationOptions.All)
.Create();
})
.AutoFromAssemblies();Consumer Configuration:
var internalBus = new InternalBus();
services.AddBrighter(options =>
{
options.HandlerLifetime = ServiceLifetime.Scoped;
})
.AddConsumers(options =>
{
options.Subscriptions = subscriptions;
options.ChannelFactory = new InMemoryChannelFactory(internalBus, TimeProvider.System);
})
.AutoFromAssemblies()
.AddHostedService<ServiceActivatorHostedService>();Complete Example
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
var internalBus = new InternalBus();
services.AddBrighter(options =>
{
options.HandlerLifetime = ServiceLifetime.Scoped;
})
.AddProducers(options =>
{
var publication = new Publication() { Topic = new RoutingKey("GreetingMade") };
options.ProducerRegistry = new InMemoryProducerRegistryFactory(internalBus , new[] { publication }, InstrumentationOptions.All)
.Create();
})
.AddConsumers(options =>
{
options.Subscriptions = new Subscription[]
{
new Subscription<GreetingMade>(
new SubscriptionName("GreetingAnalytics"),
new ChannelName("greeting.event"),
new RoutingKey("GreetingMade")
)
};
options.ChannelFactory = new InMemoryChannelFactory(internalBus, TimeProvider.System);
})
.AutoFromAssemblies()
.AddHostedService<ServiceActivatorHostedService>();
}
}Limitations
No persistence: Messages are lost if the process crashes
Single process: Cannot distribute across multiple instances
No backpressure: Unlimited queue growth (memory bound)
No dead letter queues: Failed messages are discarded
No message TTL: Messages never expire
InMemory Outbox
The InMemory Outbox provides transactional messaging support without requiring a database. Note that if you do not specify a persistent Outbox, we will use the InMemoryOutbox, by default. Any use of the CommandProcessor's Post method uses the default InMemoryOutbox and not the persistent Outbox, as it does not take a transaction provider as an argument.
Flush of Expired Messages
The InMemory Outbox will flush expired messages. You can configure the time limit for a message, after which it will be flushed:
EntryTimeToLive Defaults to 5 minutes. Governs how long a message can remain in the Outbox.
ExpirationScanInterval Defaults to 10 mins. Governs how often a scan for expired messages runs.
Compaction of the InMemoryOutbox
The InMemoryOutbox's capacity is constrained. You can configure the limit to the number of messages the Outbox contains. If you are using the InMemoryOutbox in production scenarios, you should pay attention to this limit. Once the limit is hit, the Outbox will compact, removing older messages first. You can set a compaction percentage, which governs how many messages will be purged from the InMemoryOutbox when we compact.
EntryLimit Defaults to 2048. Governs how many messages the InMemoryOutbox can hold.
CompactionPercentage When we hit a capacity limit, what percentage of messages should we purge.
When to Use
Perfect for:
Testing transactional messaging patterns
Unit testing the Outbox pattern
Development without database dependencies
Production Use Cases (limited):
Single-process applications
Non-critical message publishing - the InMemoryOutbox is used in place of a persistent Outbox
Scenarios where message loss on restart is acceptable
Configuration
services.AddBrighter(options =>
{
options.HandlerLifetime = ServiceLifetime.Scoped;
})
.AddProducers(options =>
{
options.ProducerRegistry = /* your producer registry */;
options.Outbox = new InMemoryOutbox();
})
.UseOutboxSweeper(); // Enable sweeper for reliabilityExample of Post
public class CreatePersonHandler : RequestHandlerAsync<CreatePerson>
{
private readonly IAmACommandProcessor _commandProcessor;
private readonly IAmAnOutboxAsync<Message, CommittableTransaction> _outbox;
private readonly PersonRepository _repository;
public override async Task<CreatePerson> HandleAsync(
CreatePerson command,
CancellationToken cancellationToken = default)
{
// Start an in-memory transaction (no real transaction support)
var person = new Person(command.Name, command.Email);
await _repository.SaveAsync(person);
// Deposit message to outbox (held in memory)
await _commandProcessor.Post(new PersonCreated { PersonId = person.Id }, cancellationToken: cancellationToken);
return await base.HandleAsync(command, cancellationToken);
}
}Limitations
No persistence: Messages lost on application restart
No transactions: Cannot participate in database transactions
Single process: State not shared across instances
Memory bound: All outstanding messages held in memory
InMemory Inbox
The InMemory Inbox provides message deduplication without requiring a database.
When to Use
Perfect for:
Unit testing duplicate message handling
Development without database dependencies
Production Use Cases (limited):
Single-process applications
Short-lived message deduplication windows
Non-critical deduplication scenarios
Configuration
var bus = new InternalBus();
services.AddBrighter(options =>
{
options.HandlerLifetime = ServiceLifetime.Scoped;
})
.AddConsumers(options =>
{
options.Inbox = new InboxConfiguration(
new InMemoryInbox(TimeProvider.System),
InboxConfiguration.NoActionOnExists
);
options.Subscriptions = subscriptions;
options.ChannelFactory = new InMemoryChannelFactory(bus);
})
.AutoFromAssemblies();Example Usage
[UseInboxAsync(step: 0, contextKey: typeof(PersonCreatedHandler), onceOnly: true)]
public class PersonCreatedHandler : RequestHandlerAsync<PersonCreated>
{
private readonly PersonRepository _repository;
[UseInboxAsync(0, typeof(PersonCreatedHandler), true)]
public override async Task<PersonCreated> HandleAsync(
PersonCreated @event,
CancellationToken cancellationToken = default)
{
// Inbox ensures this handler processes each message only once
var person = await _repository.GetByIdAsync(@event.PersonId);
person.MarkAsCreated();
await _repository.SaveAsync(person);
return await base.HandleAsync(@event, cancellationToken);
}
}Limitations
No persistence: Deduplication state lost on restart
Single process: Cannot deduplicate across instances
Memory bound: All seen message IDs held in memory
No cleanup: Old entries remain until process restart
InMemory Scheduler
The InMemory Scheduler provides delayed message execution without requiring Quartz, Hangfire, or cloud schedulers.
When to Use
See the complete InMemory Scheduler documentation for detailed information.
Perfect for:
Unit and integration tests
Local development
Demos and proof-of-concepts
Production Use Cases (very limited):
Non-critical scheduled work
Short delays (minutes, not hours/days)
Acceptable to lose scheduled work on restart
Configuration
services.AddBrighter(options =>
{
options.HandlerLifetime = ServiceLifetime.Scoped;
})
.UseScheduler(new InMemorySchedulerFactory())
.AutoFromAssemblies();Example Usage
public class OrderService
{
private readonly IAmACommandProcessor _commandProcessor;
public async Task CreateOrder(Order order)
{
await _repository.SaveAsync(order);
// Schedule confirmation email for 5 minutes later
var schedulerId = await _commandProcessor.SendAsync(
TimeSpan.FromMinutes(5),
new SendOrderConfirmationCommand { OrderId = order.Id }
);
}
}For complete documentation, see InMemory Scheduler.
InMemory Archive
The InMemory Archive stores dispatched messages in memory for diagnostics and replay.
When to Use
Perfect for:
Testing message archiving
Development and debugging
Inspecting sent messages in tests
Not recommended for production due to unbounded memory growth.
Configuration
services.AddBrighter(options =>
{
options.HandlerLifetime = ServiceLifetime.Scoped;
})
.UseOutboxArchiver(new InMemoryArchiveProvider())
.AddProducers(/* producer configuration */);Example Usage
public class LargeMessageMapper : IAmAMessageMapper<LargeDataCommand>
{
private readonly IAmAStorageProviderAsync _storageProvider;
[ClaimCheck(0, thresholdInKb: 5)] // Store payloads > 5KB
public Message MapToMessage(LargeDataCommand request)
{
var header = new MessageHeader(
messageId: request.Id,
topic: new RoutingKey("LargeData"),
messageType: MessageType.MT_COMMAND
);
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
}Complete Testing Example
Here's a complete example showing how to use multiple InMemory components together:
public class IntegrationTests : IDisposable
{
private readonly ServiceProvider _serviceProvider;
private readonly IAmACommandProcessor _commandProcessor;
private readonly InMemoryMessageProducer _inMemoryProducer;
private readonly _internalBus = new InternalBus();
public IntegrationTests()
{
var services = new ServiceCollection();
var internalBus = new InternalBus();
services.AddBrighter(options =>
{
options.HandlerLifetime = ServiceLifetime.Scoped;
})
.AddProducers(options =>
{
var publication = new Publication() { Topic = new RoutingKey("PersonCreated") };
options.ProducerRegistry = new InMemoryProducerRegistryFactory(_internalBus , new[] { publication }, InstrumentationOptions.All)
.Create();
options.Outbox = new InMemoryOutbox();
})
.AddConsumers(options =>
{
// InMemory Inbox for deduplication
options.Inbox = new InboxConfiguration(
new InMemoryInbox(TimeProvider.System),
InboxConfiguration.NoActionOnExists
);
options.Subscriptions = new Subscription[]
{
new InMemorySubscription<PersonCreated>(
new SubscriptionName("PersonAnalytics"),
new ChannelName("person.created"),
new RoutingKey("PersonCreated")
)
};
options.ChannelFactory = new InMemoryChannelFactory(_internalBus, TimeProvider.System);
})
.UseScheduler(new InMemorySchedulerFactory()) // InMemory Scheduler
.UseInMemoryArchiveProvider() // InMemory Archive
.AutoFromAssemblies();
_serviceProvider = services.BuildServiceProvider();
_commandProcessor = _serviceProvider.GetRequiredService<IAmACommandProcessor>();
}
[Fact]
public async Task Should_Publish_And_Consume_Message_With_InMemory_Components()
{
// Arrange
var command = new CreatePersonCommand { Name = "Alice", Email = "alice@example.com" };
// Act - Publish with InMemory Outbox
await _commandProcessor.SendAsync(command);
await _commandProcessor.ClearOutboxAsync();
// Wait for InMemory consumer to process
await Task.Delay(100);
var messages = _internalBus.Stream(new RoutingKey("PersonCreated"));
Assert.Any(messages);
}
[Fact]
public async Task Should_Schedule_Message_With_InMemory_Scheduler()
{
// Arrange
var command = new SendEmailCommand { To = "alice@example.com" };
// Act - Schedule with InMemory Scheduler
var schedulerId = await _commandProcessor.SendAsync(
TimeSpan.FromMilliseconds(100),
command
);
// Assert - Wait for execution
await Task.Delay(150);
var messages = _internalBus.Stream(new RoutingKey("PersonCreated"));
Assert.Any(messages);
}
public void Dispose()
{
_serviceProvider?.Dispose();
}
}Environment-Specific Configuration
Use InMemory components for development/testing, production components elsewhere:
public static class BrighterConfiguration
{
public static IServiceCollection AddBrighterWithEnvironmentConfig(
this IServiceCollection services,
IHostEnvironment environment,
IConfiguration configuration)
{
var internalBus = new InternalBus();
services.AddBrighter(options =>
{
options.HandlerLifetime = ServiceLifetime.Scoped;
})
.AddProducers(options =>
{
options.ProducerRegistry = GetProducerRegistry(environment, configuration, internalBus);
})
.UseOutbox(GetOutbox(environment, configuration))
.UseScheduler(GetSchedulerFactory(environment, configuration))
.AddConsumers(options =>
{
options.Inbox = GetInbox(environment, configuration);
options.Subscriptions = GetSubscriptions();
options.ChannelFactory = GetChannelFactory(environment, configuration, internalBus);
})
.AutoFromAssemblies();
return services;
}
private static IAmAProducerRegistry GetProducerRegistry(
IHostEnvironment environment,
IConfiguration configuration,
IAmABus bus)
{
if (environment.IsDevelopment() || environment.IsEnvironment("Testing"))
{
return new InMemoryProducerRegistryFactory(bus , new[] { publication }, InstrumentationOptions.All)
.Create();
}
// Production: RabbitMQ, Kafka, AWS SQS, etc.
return new RmqProducerRegistryFactory(/* production config */).Create();
}
private static IMessageSchedulerFactory GetSchedulerFactory(
IHostEnvironment environment,
IConfiguration configuration)
{
if (environment.IsDevelopment() || environment.IsEnvironment("Testing"))
{
return new InMemorySchedulerFactory();
}
// Production: Quartz, Hangfire, AWS Scheduler, etc.
return new HangfireMessageSchedulerFactory(
configuration.GetConnectionString("Hangfire")
);
}
private static IAmAnOutbox<Message, CommittableTransaction> GetOutbox(
IHostEnvironment environment,
IConfiguration configuration)
{
if (environment.IsDevelopment() || environment.IsEnvironment("Testing"))
{
return new InMemoryOutbox(TimeProvider.System);
}
// Production: SQL Server, PostgreSQL, MySQL, DynamoDB, etc.
return new MsSqlOutbox(/* production config */);
}
private static InboxConfiguration GetInbox(
IHostEnvironment environment,
IConfiguration configuration)
{
if (environment.IsDevelopment() || environment.IsEnvironment("Testing"))
{
return new InboxConfiguration(
new InMemoryInbox(TimeProvider.System),
InboxConfiguration.NoActionOnExists
);
}
// Production: SQL Server, PostgreSQL, MySQL, DynamoDB, etc.
return new InboxConfiguration(
new MsSqlInbox(/* production config */),
InboxConfiguration.NoActionOnExists
);
}
private static IAmAChannelFactory GetChannelFactory(
IHostEnvironment environment,
IConfiguration configuration,
IAmABus bus)
{
if (environment.IsDevelopment() || environment.IsEnvironment("Testing"))
{
return new InMemoryChannelFactory(bus);
}
// Production: RabbitMQ, Kafka, AWS SQS, etc.
return new ChannelFactory(new RmqMessageConsumerFactory(/* config */));
}
}Comparison with Production Components
Persistence
None
Database/Disk
Distribution
Single process
Multi-instance
Durability
None
ACID guarantees
Performance
Very fast
Network/IO bound
Setup
Zero config
Requires infrastructure
Testing
Ideal
Complex setup
Production
Limited
Recommended
Migration to Production
When moving to production, replace InMemory components:
InMemory Transport
RabbitMQ, Kafka, AWS SQS, Azure Service Bus
InMemory Outbox
MS SQL, PostgreSQL, MySQL, DynamoDB, MongoDB
InMemory Inbox
MS SQL, PostgreSQL, MySQL, DynamoDB, MongoDB
InMemory Scheduler
Quartz, Hangfire, AWS Scheduler, Azure Service Bus Scheduler
InMemory Archive
Database-backed archive provider
No code changes required - just swap the registration in your DI container!
Summary
Brighter V10 provides comprehensive InMemory options for all major components:
Best For:
Unit and integration testing
Local development
Demos and POCs
CI/CD pipelines (fast, no external dependencies)
Not Recommended For:
Production systems requiring durability
Distributed/multi-instance applications
Long-running scheduled work
Use InMemory options to accelerate development and testing, then migrate to production components for deployed applications with durability and distribution requirements.
Last updated
Was this helpful?
