🚌 Message Buses
Message Buses are used to spread messages to multiple listeners. Contrary to message queues, they do not store the message or wait for the listeners to respond. Often, filter patterns can also be used to trigger listeners or not based on the message content.
You can use the AggregateExistsMessageBus
, NotificationMessageBus
or StateCarryingMessageBus
classes to implement message buses:
import { NotificationMessageBus } from '@castore/core';
const appMessageBus = new NotificationMessageBus({
messageBusId: 'APP_MESSAGE_BUSES',
sourceEventStores: [pokemonsEventStore, trainersEventStore...],
});
await appMessageBus.publishMessage({
// 👇 Typed as NotificationMessage of one of the source event stores
eventStoreId: 'POKEMONS',
event: {
type: 'POKEMON_LEVELED_UP',
...
}
})
// Similar for AggregateExistsMessageBus and StateCarryingMessageBus
Similarly to event stores, MessageBus
classes provide a boilerplate-free and type-safe interface to publish messages, but are NOT responsible for actually doing so. This is the responsibility of the MessageBusAdapter
, that will connect it to your actual messaging solution:
import { EventStore } from '@castore/core';
await messageBus.publishMessage(...);
// ❌ Will throw an `UndefinedMessageChannelAdapterError`
const messageBus = new NotificationMessageBus({
...
// 👇 Provide it in the constructor
messageBusAdapter: mySuperMessageBusAdapter,
});
// 👇 ...or set/switch it in context later
messageBus.messageChannelAdapter = mySuperMessageBusAdapter;
await messageBus.publishMessage(...);
// 🙌 Will work!
You can code your own MessageBusAdapter
(simply implement the MessageChannelAdapter
interface), but we highly recommend using an off-the-shelf adapter (if the messaging solution that you use is missing, feel free to create/upvote an issue, or contribute 🤗).
The adapter packages will also expose useful generics to type the arguments of your bus listeners. For instance:
import type { EventBridgeMessageBusMessage } from '@castore/message-bus-adapter-event-bridge';
const pokemonMessagesListener = async (
// 👇 Specify that you only listen to the pokemonsEventStore messages
eventBridgeMessage: EventBridgeMessageBusMessage<
typeof appMessageQueue,
'POKEMONS'
>,
) => {
// 👇 Correctly typed!
const message = eventBridgeMessage.detail;
};
🔧 Reference
Constructor:
messageBusId (string)
: A string identifying the message bussourceEventStores (EventStore[])
: List of event stores that the message bus will broadcast events frommessageBusAdapter (?MessageChannelAdapter)
: Message bus adapter
Properties:
messageBusId (string)
const appMessageBusId = appMessageBus.messageBusId;
// => 'APP_MESSAGE_BUS'
sourceEventStores (EventStore[])
const appMessageBusSourceEventStores = appMessageBus.sourceEventStores;
// => [pokemonsEventStore, trainersEventStore...]
messageChannelAdapter ?MessageChannelAdapter
: Returns the associated message bus adapter (potentially undefined)
const appMessageBusAdapter = appMessageBus.messageChannelAdapter;
// => undefined (we did not provide one in this example)
☝️ The
messageChannelAdapter
is not read-only so you do not have to provide it right away.
Async Methods:
The following methods interact with the messaging solution of your application through a MessageBusAdapter
. They will throw an UndefinedMessageChannelAdapterError
if you did not provide one.
publishMessage ((message: Message, opt?: OptionsObj) => Promise<void>)
: Publish aMessage
(of the appropriate type) to the message bus.OptionsObj
contains the following properties:replay (?boolean = false)
: Signals that the event is not happening in real-time, e.g. in maintenance or migration operations. This information can be used downstream to react appropriately. Check the implementation of you adapter for more details.
publishMessages ((messages: Message[], opt?: OptionsObj) => Promise<void>)
: Publish severalMessages
(of the appropriate type) to the message bus. Options are similar to thepublishMessage
options.getAggregateAndPublishMessage ((message: NotificationMessage) => Promise<void>)
: (StateCarryingMessageBuses only) Append the matching aggregate (with correct version) to aNotificationMessage
and turn it into aStateCarryingMessage
before publishing it to the message bus. Uses the message bus event stores: Make sure that they have correct adapters set up.
Type Helpers:
MessageChannelMessage
: Given aMessageBus
, returns the TS type of its messages
import type { MessageChannelMessage } from '@castore/core';
type AppMessage = MessageChannelMessage<typeof appMessageBus>;
// 👇 Equivalent to:
type AppMessage = EventStoreNotificationMessage<
typeof pokemonsEventStore | typeof trainersEventStore...
>;