Skip to main content

🚌 Message Buses

Message Buses are used to spread messages to multiple listeners. Contrary to message queues, they do not store the message or wait for the listeners to respond. Often, filter patterns can also be used to trigger listeners or not based on the message content.

Message Bus

You can use the AggregateExistsMessageBus, NotificationMessageBus or StateCarryingMessageBus classes to implement message buses:

import { NotificationMessageBus } from '@castore/core';

const appMessageBus = new NotificationMessageBus({
messageBusId: 'APP_MESSAGE_BUSES',
sourceEventStores: [pokemonsEventStore, trainersEventStore...],
});

await appMessageBus.publishMessage({
// 👇 Typed as NotificationMessage of one of the source event stores
eventStoreId: 'POKEMONS',
event: {
type: 'POKEMON_LEVELED_UP',
...
}
})

// Similar for AggregateExistsMessageBus and StateCarryingMessageBus

Similarly to event stores, MessageBus classes provide a boilerplate-free and type-safe interface to publish messages, but are NOT responsible for actually doing so. This is the responsibility of the MessageBusAdapter, that will connect it to your actual messaging solution:

import { EventStore } from '@castore/core';

await messageBus.publishMessage(...);
// ❌ Will throw an `UndefinedMessageChannelAdapterError`

const messageBus = new NotificationMessageBus({
...
// 👇 Provide it in the constructor
messageBusAdapter: mySuperMessageBusAdapter,
});

// 👇 ...or set/switch it in context later
messageBus.messageChannelAdapter = mySuperMessageBusAdapter;

await messageBus.publishMessage(...);
// 🙌 Will work!
info

You can code your own MessageBusAdapter (simply implement the MessageChannelAdapter interface), but we highly recommend using an off-the-shelf adapter (if the messaging solution that you use is missing, feel free to create/upvote an issue, or contribute 🤗).

The adapter packages will also expose useful generics to type the arguments of your bus listeners. For instance:

import type { EventBridgeMessageBusMessage } from '@castore/message-bus-adapter-event-bridge';

const pokemonMessagesListener = async (
// 👇 Specify that you only listen to the pokemonsEventStore messages
eventBridgeMessage: EventBridgeMessageBusMessage<
typeof appMessageQueue,
'POKEMONS'
>,
) => {
// 👇 Correctly typed!
const message = eventBridgeMessage.detail;
};
🔧 Reference

Constructor:

  • messageBusId (string): A string identifying the message bus
  • sourceEventStores (EventStore[]): List of event stores that the message bus will broadcast events from
  • messageBusAdapter (?MessageChannelAdapter): Message bus adapter

Properties:

  • messageBusId (string)
const appMessageBusId = appMessageBus.messageBusId;
// => 'APP_MESSAGE_BUS'
  • sourceEventStores (EventStore[])
const appMessageBusSourceEventStores = appMessageBus.sourceEventStores;
// => [pokemonsEventStore, trainersEventStore...]
  • messageChannelAdapter ?MessageChannelAdapter: Returns the associated message bus adapter (potentially undefined)
const appMessageBusAdapter = appMessageBus.messageChannelAdapter;
// => undefined (we did not provide one in this example)

☝️ The messageChannelAdapter is not read-only so you do not have to provide it right away.

Async Methods:

The following methods interact with the messaging solution of your application through a MessageBusAdapter. They will throw an UndefinedMessageChannelAdapterError if you did not provide one.

  • publishMessage ((message: Message, opt?: OptionsObj) => Promise<void>): Publish a Message (of the appropriate type) to the message bus.

    OptionsObj contains the following properties:

    • replay (?boolean = false): Signals that the event is not happening in real-time, e.g. in maintenance or migration operations. This information can be used downstream to react appropriately. Check the implementation of you adapter for more details.
  • publishMessages ((messages: Message[], opt?: OptionsObj) => Promise<void>): Publish several Messages (of the appropriate type) to the message bus. Options are similar to the publishMessage options.
  • getAggregateAndPublishMessage ((message: NotificationMessage) => Promise<void>): (StateCarryingMessageBuses only) Append the matching aggregate (with correct version) to a NotificationMessage and turn it into a StateCarryingMessage before publishing it to the message bus. Uses the message bus event stores: Make sure that they have correct adapters set up.

Type Helpers:

  • MessageChannelMessage: Given a MessageBus, returns the TS type of its messages
import type { MessageChannelMessage } from '@castore/core';

type AppMessage = MessageChannelMessage<typeof appMessageBus>;

// 👇 Equivalent to:
type AppMessage = EventStoreNotificationMessage<
typeof pokemonsEventStore | typeof trainersEventStore...
>;