PostgreSQL Message Broker

Brighter supports for using PostgreSQL as a message broker, enabling pub/sub messaging patterns using your existing PostgreSQL infrastructure.

Overview

The PostgreSQL message broker uses a table-based queue approach where messages are stored in a PostgreSQL table and retrieved by consumers. This provides a lightweight messaging solution that leverages your existing PostgreSQL database without requiring additional message broker infrastructure.

How It Works

  1. Producer: Inserts messages into a queue store table

  2. Consumer: Retrieves messages from the queue store table based on visibility timeout

  3. Acknowledgement: Deletes processed messages from the table

  4. Reject/Requeue: Deletes or updates messages based on processing outcome

The system uses a visibility timeout mechanism (similar to AWS SQS) where messages become invisible to other consumers once retrieved, preventing duplicate processing.


Benefits

Use Existing Infrastructure

  • No additional services: Uses your existing PostgreSQL database

  • Simplified operations: One less service to manage, monitor, and maintain

  • Reduced costs: No separate message broker licensing or infrastructure

Transactional Messaging

  • Atomic operations: Messages and business data in the same database

  • Strong consistency: ACID guarantees for message operations

  • Simplified transactions: No distributed transactions needed

Familiar Tooling

  • Standard SQL: Use familiar PostgreSQL tools for monitoring and debugging

  • Built-in monitoring: Query tables directly to see queue depth and message status

  • Easy troubleshooting: Direct database access for investigating issues


When to Use

Ideal For:

  • Low to moderate message volumes (< 1000 messages/second)

  • Applications already using PostgreSQL for data persistence

  • Transactional messaging scenarios requiring atomicity with database operations

  • Development and testing with simplified infrastructure

  • Microservices where each service has its own PostgreSQL database

Not Suitable For:

  • High-volume scenarios (> 1000 messages/second)

  • Large messages (PostgreSQL has practical limits for row sizes)

  • Complex routing requirements (better served by RabbitMQ or Kafka)

  • Cross-organization messaging (where dedicated broker provides better isolation)


Limitations

Performance Constraints

  • Database overhead: Message operations add load to your database

  • Polling model: Consumers poll the database periodically (not push-based)

  • Scalability limits: Database connection pooling and table locking can become bottlenecks

Message Size

  • Practical limit: ~1MB per message (PostgreSQL row size limits)

  • Recommendation: Use Claim Check pattern for large payloads

No Native Routing

  • Simple pub/sub only: No complex routing like RabbitMQ exchanges

  • Queue-based: Each consumer reads from a specific queue (channel)

  • Manual fanout: Publish to multiple channels for fanout patterns


Configuration

NuGet Package

Install the PostgreSQL messaging gateway package:

dotnet add package Paramore.Brighter.MessagingGateway.Postgres

Database Table

Create the queue store table in your PostgreSQL database:

CREATE TABLE IF NOT EXISTS {schema}.{queue_store_table}
(
    "id" BIGSERIAL PRIMARY KEY,
    "queue" VARCHAR(255) NOT NULL,
    "content" JSONB NOT NULL,
    "visible_timeout" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    "created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX IF NOT EXISTS idx_{queue_store_table}_queue_visible
    ON {schema}.{queue_store_table}("queue", "visible_timeout");

Index Requirements: The index on (queue, visible_timeout) is critical for performance.


Producer Configuration

Basic Producer Setup

using Paramore.Brighter;
using Paramore.Brighter.MessagingGateway.Postgres;
using Paramore.Brighter.PostgreSql;

// Database configuration
var postgresConfiguration = new RelationalDatabaseConfiguration(
    connectionString: "Host=localhost;Database=myapp;Username=user;Password=pass",
    queueStoreTable: "brighter_messages",
    schemaName: "public",
    binaryMessagePayload: true  // Use JSONB for better performance
);

// Publication configuration
var publications = new List<PostgresPublication>
{
    new PostgresPublication<OrderCreatedEvent>
    {
        Topic = new RoutingKey("orders.created"),
        SchemaName = "public",
        QueueStoreTable = "brighter_messages",
        BinaryMessagePayload = true  // JSONB
    }
};

// Producer registry
var producerRegistry = new PostgresProducerRegistryFactory(
    postgresConfiguration,
    publications
).Create();

// Configure Brighter
services.AddBrighter(options =>
{
    options.HandlerLifetime = ServiceLifetime.Scoped;
})
.AddProducers(configure =>
{
    configure.ProducerRegistry = producerRegistry;
})
.AutoFromAssemblies();

Publishing Messages

public class OrderService
{
    private readonly IAmACommandProcessor _commandProcessor;

    public OrderService(IAmACommandProcessor commandProcessor)
    {
        _commandProcessor = commandProcessor;
    }

    public async Task CreateOrderAsync(CreateOrderCommand command)
    {
        // Process order
        var order = ProcessOrder(command);

        // Publish event
        var orderCreatedEvent = new OrderCreatedEvent
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            TotalAmount = order.TotalAmount,
            CreatedAt = DateTime.UtcNow
        };

        await _commandProcessor.PublishAsync(orderCreatedEvent);
    }
}

Consumer Configuration

Basic Consumer Setup

using Paramore.Brighter;
using Paramore.Brighter.MessagingGateway.Postgres;
using Paramore.Brighter.PostgreSql;

// Database configuration
var postgresConfiguration = new RelationalDatabaseConfiguration(
    connectionString: "Host=localhost;Database=myapp;Username=user;Password=pass",
    queueStoreTable: "brighter_messages",
    schemaName: "public",
    binaryMessagePayload: true
);

// Subscription configuration
var subscriptions = new List<PostgresSubscription>
{
    new PostgresSubscription<OrderCreatedEvent>(
        channelName: new ChannelName("orders.created.consumer"),
        routingKey: new RoutingKey("orders.created"),
        bufferSize: 10,                         // Number of messages to retrieve at once
        noOfPerformers: 1,                      // Number of concurrent consumers
        timeOut: TimeSpan.FromSeconds(30),
        messagePumpType: MessagePumpType.Proactor,
        makeChannels: OnMissingChannel.Create,
        visibleTimeout: TimeSpan.FromSeconds(30), // Message visibility timeout
        schemaName: "public",
        queueStoreTable: "brighter_messages",
        binaryMessagePayload: true
    )
};

// Channel factory
var channelFactory = new PostgresChannelFactory(postgresConfiguration);

// Configure Brighter Consumer
services.AddConsumers(options =>
{
    options.Subscriptions = subscriptions;
    options.ChannelFactory = channelFactory;
})
.AutoFromAssemblies();

Consuming Messages

public class OrderCreatedEventHandler : RequestHandlerAsync<OrderCreatedEvent>
{
    private readonly IEmailService _emailService;
    private readonly ILogger<OrderCreatedEventHandler> _logger;

    public OrderCreatedEventHandler(
        IEmailService emailService,
        ILogger<OrderCreatedEventHandler> logger)
    {
        _emailService = emailService;
        _logger = logger;
    }

    public override async Task<OrderCreatedEvent> HandleAsync(
        OrderCreatedEvent @event,
        CancellationToken cancellationToken = default)
    {
        _logger.LogInformation(
            "Processing order created event for Order {OrderId}",
            @event.OrderId);

        // Send confirmation email
        await _emailService.SendOrderConfirmationAsync(@event);

        return await base.HandleAsync(@event, cancellationToken);
    }
}

Configuration Options

PostgresPublication

Property
Type
Description
Default

Topic

RoutingKey

Message routing key (queue name)

Required

SchemaName

string?

Database schema name

"public"

QueueStoreTable

string?

Queue store table name

From configuration

BinaryMessagePayload

bool?

Use JSONB instead of JSON

From configuration

PostgresSubscription

Property
Type
Description
Default

ChannelName

ChannelName

Consumer channel name

Required

RoutingKey

RoutingKey

Message routing key (queue name)

Required

BufferSize

int

Messages to retrieve per poll

1

NoOfPerformers

int

Concurrent consumer threads

1

VisibleTimeout

TimeSpan

Message visibility timeout

30 seconds

SchemaName

string?

Database schema name

"public"

QueueStoreTable

string?

Queue store table name

From configuration

BinaryMessagePayload

bool?

Expect JSONB payloads

From configuration

TableWithLargeMessage

bool

Support for large messages as streams

false

RelationalDatabaseConfiguration

Property
Type
Description

ConnectionString

string

PostgreSQL connection string

QueueStoreTable

string

Default queue store table name

SchemaName

string

Default schema name

BinaryMessagePayload

bool

Default to JSONB


Message Visibility

The PostgreSQL message broker uses a visibility timeout mechanism to prevent duplicate processing:

How It Works

  1. Message Published: visible_timeout set to CURRENT_TIMESTAMP

  2. Message Retrieved: Consumer reads messages where visible_timeout <= CURRENT_TIMESTAMP

  3. Processing: Message becomes invisible to other consumers (timeout not updated)

  4. Acknowledged: Message deleted from table

  5. Timeout Expires: If not acknowledged, message becomes visible again

Visibility Timeout Example

var subscription = new PostgresSubscription<OrderEvent>(
    // Message invisible for 60 seconds after retrieval
    visibleTimeout: TimeSpan.FromSeconds(60)
);

Recommendation: Set visibility timeout to 2-3x your expected processing time to account for retries and delays.


JSON vs JSONB

PostgreSQL supports two JSON data types:

Feature
JSON
JSONB

Storage

Text-based

Binary

Performance

Slower queries

Faster queries

Size

Smaller

Larger (pre-parsed)

Indexing

Limited

Full indexing support

Recommendation

Low volume

Production use

Configuration

// Use JSONB (recommended)
var configuration = new RelationalDatabaseConfiguration(
    connectionString: connectionString,
    queueStoreTable: "brighter_messages",
    binaryMessagePayload: true  // JSONB
);

// Use JSON (smaller storage)
var configuration = new RelationalDatabaseConfiguration(
    connectionString: connectionString,
    queueStoreTable: "brighter_messages",
    binaryMessagePayload: false  // JSON
);

Scheduled Messages

PostgreSQL message broker supports message scheduling using the visibility timeout:

// Schedule message for future delivery
var delayedEvent = new OrderReminderEvent
{
    OrderId = orderId,
    ReminderText = "Your order ships tomorrow!"
};

await _commandProcessor.PublishAsync(
    delayedEvent,
    delay: TimeSpan.FromHours(24)  // Deliver in 24 hours
);

How it works: The visible_timeout is set to CURRENT_TIMESTAMP + delay, making the message invisible until the scheduled time.


Transactional Messaging

A key advantage of PostgreSQL as a message broker is transactional messaging with your business data:

Using the Outbox Pattern

public class OrderService
{
    private readonly OrderDbContext _dbContext;
    private readonly IAmACommandProcessor _commandProcessor;

    public async Task CreateOrderAsync(CreateOrderCommand command)
    {
        using var transaction = await _dbContext.Database.BeginTransactionAsync();

        try
        {
            // 1. Save order to database
            var order = new Order { /* ... */ };
            _dbContext.Orders.Add(order);
            await _dbContext.SaveChangesAsync();

            // 2. Deposit event to Outbox (same transaction)
            var orderCreatedEvent = new OrderCreatedEvent { /* ... */ };
            await _commandProcessor.DepositPostAsync(orderCreatedEvent);

            // 3. Commit transaction (atomically saves order and outbox message)
            await transaction.CommitAsync();

            // 4. Clear outbox to publish message
            await _commandProcessor.ClearOutboxAsync(new[] { orderCreatedEvent.Id });
        }
        catch
        {
            await transaction.RollbackAsync();
            throw;
        }
    }
}

See Outbox Pattern and PostgreSQL Outbox for more details.


Monitoring and Observability

Query Queue Depth

-- Current queue depth by queue
SELECT
    "queue",
    COUNT(*) as message_count,
    MIN("visible_timeout") as oldest_visible,
    MAX("created_at") as newest_message
FROM "public"."brighter_messages"
WHERE "visible_timeout" <= CURRENT_TIMESTAMP
GROUP BY "queue"
ORDER BY message_count DESC;

Query In-Flight Messages

-- Messages currently being processed (invisible)
SELECT
    "queue",
    COUNT(*) as in_flight_count
FROM "public"."brighter_messages"
WHERE "visible_timeout" > CURRENT_TIMESTAMP
GROUP BY "queue";

Find Stuck Messages

-- Messages invisible for too long (potential failures)
SELECT
    "id",
    "queue",
    "created_at",
    "visible_timeout",
    EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - "visible_timeout")) as seconds_overdue
FROM "public"."brighter_messages"
WHERE "visible_timeout" < CURRENT_TIMESTAMP - INTERVAL '5 minutes'
ORDER BY "created_at";

OpenTelemetry Integration

PostgreSQL message broker operations are automatically traced when OpenTelemetry is configured:

services.AddOpenTelemetry()
    .WithTracing(tracing =>
    {
        tracing
            .AddSource("paramore.brighter")
            .AddNpgsql()  // PostgreSQL spans
            .AddOtlpExporter();
    });

Best Practices

1. Use JSONB for Production

var configuration = new RelationalDatabaseConfiguration(
    connectionString: connectionString,
    binaryMessagePayload: true  // JSONB for better performance
);

2. Set Appropriate Visibility Timeout

var subscription = new PostgresSubscription<OrderEvent>(
    // 2-3x expected processing time
    visibleTimeout: TimeSpan.FromMinutes(5)  // Handler takes ~2 minutes max
);

3. Use Connection Pooling

// Connection string with pooling
var connectionString = "Host=localhost;Database=myapp;Username=user;Password=pass;" +
                      "Minimum Pool Size=5;Maximum Pool Size=20";  // Connection pool

4. Monitor Queue Depth

Set up alerts for queue depth:

-- Alert if queue depth > 1000
SELECT COUNT(*) FROM brighter_messages WHERE queue = 'orders.created';

5. Index Your Queue Table

-- Critical index for performance
CREATE INDEX IF NOT EXISTS idx_messages_queue_visible
    ON brighter_messages("queue", "visible_timeout");

6. Regular Cleanup

Implement cleanup for old messages (if not using auto-vacuum):

-- Delete messages older than 7 days
DELETE FROM brighter_messages
WHERE "created_at" < CURRENT_TIMESTAMP - INTERVAL '7 days';

7. Use Claim Check for Large Messages

For messages > 100KB, use the Claim Check pattern:

[ClaimCheck(threshold: 102400, dataStore: typeof(S3LuggageStore))]
public class ProcessLargeOrderCommand : Command
{
    public byte[] LargePayload { get; set; }  // Stored in S3, not in database
}

8. Separate Queue Tables for High Volume

For high-volume queues, use dedicated tables:

// High-volume queue
var highVolumeSubscription = new PostgresSubscription<HighVolumeEvent>(
    queueStoreTable: "brighter_high_volume_messages"  // Separate table
);

// Normal queue
var normalSubscription = new PostgresSubscription<NormalEvent>(
    queueStoreTable: "brighter_messages"  // Shared table
);

Comparison with Other Transports

Feature
PostgreSQL
RabbitMQ
Kafka
AWS SQS

Setup Complexity

Low

Medium

High

Low

Throughput

Low-Medium

High

Very High

Medium

Message Size

~1MB

128MB

~1MB

256KB

Persistence

Database

Disk/Memory

Disk

Managed

Routing

Simple

Advanced

Topic-based

Simple

Transactional

Yes (local)

No

No

No

Ordering

Queue-level

Queue-level

Partition-level

FIFO queues

Operational Cost

Low (existing DB)

Medium

High

Pay-per-use

Best For

Low volume, transactional

General messaging

Event streaming

AWS ecosystem


Troubleshooting

Messages Not Being Consumed

Problem: Messages remain in the queue but are not processed.

Solutions:

  1. Check visibility timeout hasn't expired:

    SELECT * FROM brighter_messages
    WHERE queue = 'your.queue' AND visible_timeout > CURRENT_TIMESTAMP;
  2. Verify consumer is running and subscriptions match queue names

  3. Check database connection pooling isn't exhausted

  4. Review logs for consumer exceptions

High Database Load

Problem: PostgreSQL CPU/disk usage is high.

Solutions:

  1. Verify index exists on (queue, visible_timeout)

  2. Use JSONB instead of JSON for better performance

  3. Reduce BufferSize if retrieving too many messages at once

  4. Consider partitioning the queue table for high volume

  5. Use connection pooling to reduce connection overhead

Messages Processed Multiple Times

Problem: Same message processed by multiple consumers.

Solutions:

  1. Increase visibleTimeout to allow more processing time

  2. Implement Inbox pattern for idempotency

  3. Check for long-running handlers that exceed visibility timeout

  4. Verify only one consumer process per subscription

Slow Message Retrieval

Problem: Consumer polls are slow.

Solutions:

  1. Add index: CREATE INDEX ON brighter_messages(queue, visible_timeout)

  2. Use JSONB instead of JSON

  3. Increase timeOut to reduce polling frequency

  4. Consider using bufferSize > 1 to retrieve multiple messages per poll


Additional Resources

Last updated

Was this helpful?