Show me the code!

There is an old principle: show don't tell, and this introduction is about showing you what you can do with Brighter and Darker. It's not about how - more detailed documentation elsewhere shows you how to write this code. It's not about why - articles elsewhere discuss some of the reasons behind this approach. It is just, let me see how Brighter works.

Brighter and Darker

Brighter is about Requests

A Request is a message sent over a bus. A request may update state.

A Command is an instruction to execute some behavior. An Event is a notification.

You use the Command Processor to separate the sender from the receiver, and to provide middleware functionality like a retry.

Darker is about Queries

A Query is a message executed via a bus that returns a Result. A query does not update state.

You use the Query Processor to separate the requester from the replier, and to provide middleware functionality like a retry.

Middleware

Both Brighter and Darker allow you to provide middleware that runs between a request or query being made and being handled. The middleware used by a handler is configured by attributes.

Sending and Querying Example

In this example, we show sending a command, and querying for the results of issuing it, from within an ASP.NET WebAPI controller method.

[Route("{name}/new")]
[HttpPost]
public async Task<ActionResult<FindPersonsGreetings>> Post(string name, NewGreeting newGreeting)
{
	await _commandProcessor.SendAsync(new AddGreeting(name, newGreeting.Greeting));

	var personsGreetings = await _queryProcessor.ExecuteAsync(new FindGreetingsForPerson(name));

	if (personsGreetings == null) return new NotFoundResult();

	return Ok(personsGreetings);
}

Handling Examples

Handler code listens for and responds to requests or queries. The handler for the above request and query are:

[RequestLoggingAsync(0, HandlerTiming.Before)]
[UsePolicyAsync(step:1, policy: Policies.Retry.EXPONENTIAL_RETRYPOLICYASYNC)]
public override async Task<AddPerson> HandleAsync(AddPerson addPerson, CancellationToken cancellationToken = default)
{
	await using var connection = await _relationalDbConnectionProvider.GetConnectionAsync(cancellationToken);
	await connection.ExecuteAsync("insert into Person (Name) values (@Name)", new {Name = addPerson.Name});
	return await base.HandleAsync(addPerson, cancellationToken);
}
[QueryLogging(0)]
[RetryableQuery(1, Retry.EXPONENTIAL_RETRYPOLICYASYNC)]
public override async Task<FindPersonsGreetings> ExecuteAsync(FindGreetingsForPerson query, CancellationToken cancellationToken = new CancellationToken())
{
	//Retrieving parent and child is a bit tricky with Dapper. From raw SQL We wget back a set that has a row-per-child. We need to turn that
	//into one entity per parent, with a collection of children. To do that we bring everything back into memory, group by parent id and collate all
	//the children for that group.

	var sql = @"select p.Id, p.Name, g.Id, g.Message 
		from Person p
		inner join Greeting g on g.Recipient_Id = p.Id";
	await using var connection = await _relationalDbConnectionProvider.GetConnectionAsync(cancellationToken);
	var people = await connection.QueryAsync<Person, Greeting, Person>(sql, (person, greeting) =>
	{
		person.Greetings.Add(greeting);

		return person;
	}, splitOn: "Id");
	
	if (!people.Any())
	{
		return new FindPersonsGreetings(){Name = query.Name, Greetings = Array.Empty<Salutation>()};
	}

	var peopleGreetings = people.GroupBy(p => p.Id).Select(grp =>
	{
		var groupedPerson = grp.First();
		groupedPerson.Greetings = grp.Select(p => p.Greetings.Single()).ToList();
		return groupedPerson;
	});

	var person = peopleGreetings.Single();

	return new FindPersonsGreetings
	{
		Name = person.Name, Greetings = person.Greetings.Select(g => new Salutation(g.Greet()))
	};
}

}

Using an External Bus

As well as using an Internal Bus, in Brighter you can use an External Bus - middleware such as RabbitMQ or Kafka - to send a request between processes. Brighter supports both sending a request, and provides a Dispatcher than can listen for requests on middleware and forward it to a handler.

The following code sends a request to another process.

[RequestLoggingAsync(0, HandlerTiming.Before)]
[UsePolicyAsync(step:1, policy: Policies.Retry.EXPONENTIAL_RETRYPOLICYASYNC)]
public override async Task<AddGreeting> HandleAsync(AddGreeting addGreeting, CancellationToken cancellationToken = default)
{
	var posts = new List<Guid>();
	
	//We use the unit of work to grab connection and transaction, because Outbox needs
	//to share them 'behind the scenes'

	var conn = await _transactionProvider.GetConnectionAsync(cancellationToken);
	var tx = await _transactionProvider.GetTransactionAsync(cancellationToken);
	try
	{
		var people = await conn.QueryAsync<Person>(
                    "select * from Person where name = @name",
                    new {name = addGreeting.Name},
                    tx
                );
                var person = people.SingleOrDefault();

                if (person != null)
                {
                    var greeting = new Greeting(addGreeting.Greeting, person);

                    //write the added child entity to the Db
                    await conn.ExecuteAsync(
                        "insert into Greeting (Message, Recipient_Id) values (@Message, @RecipientId)",
                        new { greeting.Message, RecipientId = greeting.RecipientId },
                        tx);

                    //Now write the message we want to send to the Db in the same transaction.
                    posts.Add(await _postBox.DepositPostAsync(
                        new GreetingMade(greeting.Greet()),
                        _transactionProvider,
                        cancellationToken: cancellationToken));

                    //commit both new greeting and outgoing message
                    await _transactionProvider.CommitAsync(cancellationToken);
                }
	}
	catch (Exception e)
	{
                _logger.LogError(e, "Exception thrown handling Add Greeting request");
                //it went wrong, rollback the entity change and the downstream message
                await _transactionProvider.RollbackAsync(cancellationToken);
                return await base.HandleAsync(addGreeting, cancellationToken);
	}
	finally
	{
		_transactionProvider.Close();
	}

	//Send this message via a transport. We need the ids to send just the messages here, not all outstanding ones.
	//Alternatively, you can let the Sweeper do this, but at the cost of increased latency
	await _postBox.ClearOutboxAsync(posts, cancellationToken:cancellationToken);

	return await base.HandleAsync(addGreeting, cancellationToken);
}

The following code receives a message, sent from another process, via a dispatcher. It uses an Inbox to ensure that it does not process duplicate messages

[UseInbox(step:0, contextKey: typeof(GreetingMadeHandler), onceOnly: true )] 
[RequestLogging(step: 1, timing: HandlerTiming.Before)]
[UsePolicy(step:2, policy: Policies.Retry.EXPONENTIAL_RETRYPOLICY)]
public override GreetingMade Handle(GreetingMade @event)
{
	var posts = new List<Guid>();
            
	var tx = _transactionConnectionProvider.GetTransaction();
	var conn = tx.Connection; 
	try
	{
		var salutation = new Salutation(@event.Greeting);
			
		conn.Execute(
			"insert into Salutation (greeting) values (@greeting)", 
			new {greeting = salutation.Greeting}, 
			tx); 
		
		posts.Add(_postBox.DepositPost(
			new SalutationReceived(DateTimeOffset.Now), 
			_transactionConnectionProvider));
		
		_transactionConnectionProvider.Commit();
	}
	catch (Exception e)
	{
		_logger.LogError(e, "Could not save salutation");
	
		//if it went wrong rollback entity write and Outbox write
		_transactionConnectionProvider.Rollback();
	
		return base.Handle(@event);
	}

	_postBox.ClearOutbox(posts.ToArray());
	
	return base.Handle(@event);
}

Last updated