Skip to main content

📨 Message Queues

Message Queues store the published messages until they are handled by a worker. The worker is unique and predictible. It consumes all messages indifferently of their content.

Message Queue

You can use the AggregateExistsMessageQueue, NotificationMessageQueue or StateCarryingMessageQueue classes to implement message queues:

import { NotificationMessageQueue } from '@castore/core';

const appMessageQueue = new NotificationMessageQueue({
messageQueueId: 'APP_MESSAGE_QUEUE',
sourceEventStores: [pokemonsEventStore, trainersEventStore],
});

await appMessageQueue.publishMessage({
// 👇 Typed as NotificationMessage of one of the source event stores
eventStoreId: 'POKEMONS',
event: {
type: 'POKEMON_LEVELED_UP',
...
},
});

// Similar for AggregateExistsMessageQueue and StateCarryingMessageQueue

Similarly to event stores, MessageQueue classes provide a boilerplate-free and type-safe interface to publish messages, but are NOT responsible for actually doing so. This is the responsibility of the MessageQueueAdapter, that will connect it to your actual messaging solution:

import { EventStore } from '@castore/core';

await messageQueue.publishMessage(...);
// ❌ Will throw an `UndefinedMessageChannelAdapterError`

const messageQueue = new NotificationMessageQueue({
...
// 👇 Provide it in the constructor
messageQueueAdapter: mySuperMessageQueueAdapter,
});

// 👇 ...or set/switch it in context later
messageQueue.messageChannelAdapter = mySuperMessageQueueAdapter;

await messageQueue.publishMessage(...);
// 🙌 Will work!
info

You can code your own MessageQueueAdapter (simply implement the MessageChannelAdapter interface), but we highly recommend using an off-the-shelf adapter (if the messaging solution that you use does not have an adapter yet, feel free to create/upvote an issue, or contribute 🤗).

The adapter packages will also expose useful generics to type the arguments of your queue worker. For instance:

import type {
SQSMessageQueueMessage,
SQSMessageQueueMessageBody,
} from '@castore/message-queue-adapter-sqs';

const appMessagesWorker = async ({ Records }: SQSMessageQueueMessage) => {
for (const { body } of Records) {
// 👇 Correctly typed!
const recordBody: SQSMessageQueueMessageBody<typeof appMessageQueue> =
JSON.parse(body);
}
};
🔧 Reference

Constructor:

  • messageQueueId (string): A string identifying the message queue
  • sourceEventStores (EventStore[]): List of event stores that the message queue will broadcast events from
  • messageQueueAdapter (?MessageChannelAdapter): Message queue adapter

Properties:

  • messageChannelId (string)
const appMessageQueueId = appMessageQueue.messageChannelId;
// => 'APP_MESSAGE_QUEUE'
  • sourceEventStores (EventStore[])
const appMessageQueueSourceEventStores = appMessageQueue.sourceEventStores;
// => [pokemonsEventStore, trainersEventStore...]
  • messageChannelAdapter ?MessageChannelAdapter: Returns the associated message queue adapter (potentially undefined)
const appMessageQueueAdapter = appMessageQueue.messageChannelAdapter;
// => undefined (we did not provide one in this example)

☝️ The messageChannelAdapter is not read-only so you do not have to provide it right away.


Async Methods:

The following methods interact with the messaging solution of your application through a MessageQueueAdapter. They will throw an UndefinedMessageChannelAdapterError if you did not provide one.

  • publishMessage ((message: Message, opt?: OptionsObj) => Promise<void>): Publish a Message (of the appropriate type) to the message queue.

    OptionsObj contains the following properties:

    • replay (?boolean = false): Signals that the event is not happening in real-time, e.g. in maintenance or migration operations. This information can be used downstream to react appropriately. Check the implementation of you adapter for more details.
  • publishMessages ((messages: Message[], opt?: OptionsObj) => Promise<void>): Publish several Messages (of the appropriate type) to the message queue. Options are similar to the publishMessage options.
  • getAggregateAndPublishMessage ((message: NotificationMessage) => Promise<void>): (StateCarryingMessageQueues only) Append the matching aggregate (with correct version) to a NotificationMessage and turn it into a StateCarryingMessage before publishing it to the message queue. Uses the message queue event stores: Make sure that they have correct adapters set up.

Type Helpers:

  • MessageChannelMessage: Given a MessageQueue, returns the TS type of its messages
import type { MessageChannelMessage } from '@castore/core';

type AppMessage = MessageChannelMessage<typeof appMessageQueue>;

// 👇 Equivalent to:
type AppMessage = EventStoreNotificationMessage<
typeof pokemonsEventStore | typeof trainersEventStore...
>;