LogoLogo
  • README
  • VERSION 9
    • Version Begins
  • Overview
    • Show me the code!
    • Basic Concepts
    • Why Brighter?
  • Brighter Configuration
    • Basic Configuration
    • How Configuring the Command Processor Works
    • How Configuring a Dispatcher for an External Bus Works
    • RabbitMQ Configuration
    • AWS SNS Configuration
    • Kafka Configuration
    • Azure Service Bus Configuration
    • Azure Archive Provider Configuration
  • Darker Configuration
    • Basic Configuration
  • Brighter Request Handlers and Middleware Pipelines
    • Building an Async Pipeline of Request Handlers
    • Basic Configuration
    • How to Implement an Async Request Handler
    • Requests, Commands and an Events
    • Dispatching Requests
    • Dispatching An Async Request
    • Returning results from a Handler
    • Using an External Bus
    • Message Mappers
    • Routing
    • Building a Pipeline of Request Handlers
    • Passing information between Handlers in the Pipeline
    • Failure and Dead Letter Queues
    • Supporting Retry and Circuit Breaker
    • Failure and Fallback
    • Feature Switches
  • Guaranteed At Least Once
    • Outbox Support
    • Inbox Support
    • EFCore Outbox
    • Dapper Outbox
    • Dynamo Outbox
    • MSSQL Inbox
    • MySQL Inbox
    • Postgres Inbox
    • Sqlite Inbox
    • Dynamo Inbox
  • Darker Query Handlers and Middleware Pipelines
    • How to Implement a Query Handler
  • Health Checks and Observability
    • Logging
    • Monitoring
    • Health Checks
    • Telemetry
  • Command, Processors and Dispatchers
    • Command, Processor and Dispatcher Patterns
  • Under the Hood
    • How The Command Processor Works
    • How Service Activator Works
  • Event Driven Architectures
    • Microservices
    • Event Driven Collaboration
    • Event Carried State Transfer
    • Outbox Pattern
  • Task Queues
    • Using a Task Queue
  • FAQ
    • FAQ
  • END OF VERSION
    • Version Ends
  • VERSION 10
    • Version Begins
  • Overview
    • Show me the code!
    • Basic Concepts
    • Why Brighter?
  • Brighter Configuration
    • Basic Configuration
    • How Configuring the Command Processor Works
    • How Configuring a Dispatcher for an External Bus Works
    • RabbitMQ Configuration
    • AWS SNS Configuration
    • Kafka Configuration
    • Azure Service Bus Configuration
    • Azure Archive Provider Configuration
  • Darker Configuration
    • Basic Configuration
  • Brighter Request Handlers and Middleware Pipelines
    • Building an Async Pipeline of Request Handlers
    • Basic Configuration
    • How to Implement an Async Request Handler
    • Requests, Commands and an Events
    • Dispatching Requests
    • Dispatching An Async Request
    • Returning results from a Handler
    • Using an External Bus
    • Message Mappers
    • Routing
    • Building a Pipeline of Request Handlers
    • Passing information between Handlers in the Pipeline
    • Failure and Dead Letter Queues
    • Supporting Retry and Circuit Breaker
    • Failure and Fallback
    • Feature Switches
  • Guaranteed At Least Once
    • Outbox Support
    • Inbox Support
    • EFCore Outbox
    • Dapper Outbox
    • Dynamo Outbox
    • MSSQL Inbox
    • MySQL Inbox
    • Postgres Inbox
    • Sqlite Inbox
    • Dynamo Inbox
  • Darker Query Handlers and Middleware Pipelines
    • How to Implement a Query Handler
  • Health Checks and Observability
    • Logging
    • Monitoring
    • Health Checks
    • Telemetry
  • Command, Processors and Dispatchers
    • Command, Processor and Dispatcher Patterns
  • Under the Hood
    • How The Command Processor Works
    • How Service Activator Works
  • Event Driven Architectures
    • Microservices
    • Event Driven Collaboration
    • Event Carried State Transfer
    • Outbox Pattern
  • Task Queues
    • Using a Task Queue
  • FAQ
    • FAQ
  • END OF VERSION
    • Version Ends
Powered by GitBook
On this page

Was this helpful?

Edit on GitHub
  1. Guaranteed At Least Once

Dynamo Outbox

PreviousDapper OutboxNextMSSQL Inbox

Last updated 2 years ago

Was this helpful?

Usage

The DynamoDb Outbox allows integration between DynamoDb and . The configuration is described in .

To support transactional messaging when using DynamoDb requires us to use DynamoDb's support for ACID transactions. You should understand best practices for using transactions with DynamoDb.

For this we will need the Outbox package for DynamoDb:

  • Paramore.Brighter.Outbox.DynamoDB

Paramore.Brighter.Outbox.DynamoDb will pull in another package:

  • Paramore.Brighter.DynamoDb

As described in , we configure Brighter to use an outbox with the Use{DB}Outbox method call.

As we want to use DynamoDb with the outbox, we also call: Use{DB}TransactionConnectionProvider so that we can share your transaction scope when persisting messages to the outbox.

public void ConfigureServices(IServiceCollection services)
{
    services.AddBrighter(...)
        .UseExternalBus(...)
        .UseDynamoDbOutbox(ServiceLifetime.Singleton)
        .UseDynamoDbTransactionConnectionProvider(typeof(DynamoDbUnitOfWork), ServiceLifetime.Scoped)
        .UseOutboxSweeper()

        ...
}

In our handler we take a dependency on Brighter's IAmABoxTransactionConnectionProvider interface and convert it to a DynamoDbUnitofWork. We explicitly start a transaction within the handler on the Database within the Unit of Work.

We call DepositPostAsync within that transaction to write the message to the Outbox. Once the transaction has closed we can call ClearOutboxAsync to immediately clear, or we can rely on the Outbox Sweeper, if we have configured one to clear for us. (There are equivalent synchronous versions of these APIs).x

public override async Task<AddGreeting> HandleAsync(AddGreeting addGreeting, CancellationToken cancellationToken = default(CancellationToken))
{
	var posts = new List<Guid>();

	//We use the unit of work to grab connection and transaction, because Outbox needs
	//to share them 'behind the scenes'
	var context = new DynamoDBContext(_unitOfWork.DynamoDb);
	var transaction = _unitOfWork.BeginOrGetTransaction();
	try
	{
		var person = await context.LoadAsync<Person>(addGreeting.Name);

		person.Greetings.Add(addGreeting.Greeting);

		var document = context.ToDocument(person);
		var attributeValues = document.ToAttributeMap();

		//write the added child entity to the Db - just replace the whole entity as we grabbed the original
		//in production code, an update expression would be faster
		transaction.TransactItems.Add(new TransactWriteItem{Put = new Put{TableName = "People", Item = attributeValues}});

		//Now write the message we want to send to the Db in the same transaction.
		posts.Add(await _postBox.DepositPostAsync(new GreetingMade(addGreeting.Greeting), cancellationToken: cancellationToken));

		//commit both new greeting and outgoing message
		await _unitOfWork.CommitAsync(cancellationToken);
	}
	catch (Exception e)
	{   
		_logger.LogError(e, "Exception thrown handling Add Greeting request");
		//it went wrong, rollback the entity change and the downstream message
		_unitOfWork.Rollback();
		return await base.HandleAsync(addGreeting, cancellationToken);
	}

	//Send this message via a transport. We need the ids to send just the messages here, not all outstanding ones.
	//Alternatively, you can let the Sweeper do this, but at the cost of increased latency
	await _postBox.ClearOutboxAsync(posts, cancellationToken:cancellationToken);

	return await base.HandleAsync(addGreeting, cancellationToken);
}
Brighter's outbox support
Basic Configuration
Basic Configuration