📨 Message Queues
Message Queues store the published messages until they are handled by a worker. The worker is unique and predictible. It consumes all messages indifferently of their content.
You can use the AggregateExistsMessageQueue
, NotificationMessageQueue
or StateCarryingMessageQueue
classes to implement message queues:
import { NotificationMessageQueue } from '@castore/core';
const appMessageQueue = new NotificationMessageQueue({
messageQueueId: 'APP_MESSAGE_QUEUE',
sourceEventStores: [pokemonsEventStore, trainersEventStore],
});
await appMessageQueue.publishMessage({
// 👇 Typed as NotificationMessage of one of the source event stores
eventStoreId: 'POKEMONS',
event: {
type: 'POKEMON_LEVELED_UP',
...
},
});
// Similar for AggregateExistsMessageQueue and StateCarryingMessageQueue
Similarly to event stores, MessageQueue
classes provide a boilerplate-free and type-safe interface to publish messages, but are NOT responsible for actually doing so. This is the responsibility of the MessageQueueAdapter
, that will connect it to your actual messaging solution:
import { EventStore } from '@castore/core';
await messageQueue.publishMessage(...);
// ❌ Will throw an `UndefinedMessageChannelAdapterError`
const messageQueue = new NotificationMessageQueue({
...
// 👇 Provide it in the constructor
messageQueueAdapter: mySuperMessageQueueAdapter,
});
// 👇 ...or set/switch it in context later
messageQueue.messageChannelAdapter = mySuperMessageQueueAdapter;
await messageQueue.publishMessage(...);
// 🙌 Will work!
You can code your own MessageQueueAdapter
(simply implement the MessageChannelAdapter
interface), but we highly recommend using an off-the-shelf adapter (if the messaging solution that you use does not have an adapter yet, feel free to create/upvote an issue, or contribute 🤗).
The adapter packages will also expose useful generics to type the arguments of your queue worker. For instance:
import type {
SQSMessageQueueMessage,
SQSMessageQueueMessageBody,
} from '@castore/message-queue-adapter-sqs';
const appMessagesWorker = async ({ Records }: SQSMessageQueueMessage) => {
for (const { body } of Records) {
// 👇 Correctly typed!
const recordBody: SQSMessageQueueMessageBody<typeof appMessageQueue> =
JSON.parse(body);
}
};
🔧 Reference
Constructor:
messageQueueId (string)
: A string identifying the message queuesourceEventStores (EventStore[])
: List of event stores that the message queue will broadcast events frommessageQueueAdapter (?MessageChannelAdapter)
: Message queue adapter
Properties:
messageChannelId (string)
const appMessageQueueId = appMessageQueue.messageChannelId;
// => 'APP_MESSAGE_QUEUE'
sourceEventStores (EventStore[])
const appMessageQueueSourceEventStores = appMessageQueue.sourceEventStores;
// => [pokemonsEventStore, trainersEventStore...]
messageChannelAdapter ?MessageChannelAdapter
: Returns the associated message queue adapter (potentially undefined)
const appMessageQueueAdapter = appMessageQueue.messageChannelAdapter;
// => undefined (we did not provide one in this example)
☝️ The
messageChannelAdapter
is not read-only so you do not have to provide it right away.
Async Methods:
The following methods interact with the messaging solution of your application through a MessageQueueAdapter
. They will throw an UndefinedMessageChannelAdapterError
if you did not provide one.
publishMessage ((message: Message, opt?: OptionsObj) => Promise<void>)
: Publish aMessage
(of the appropriate type) to the message queue.OptionsObj
contains the following properties:replay (?boolean = false)
: Signals that the event is not happening in real-time, e.g. in maintenance or migration operations. This information can be used downstream to react appropriately. Check the implementation of you adapter for more details.
publishMessages ((messages: Message[], opt?: OptionsObj) => Promise<void>)
: Publish severalMessages
(of the appropriate type) to the message queue. Options are similar to thepublishMessage
options.getAggregateAndPublishMessage ((message: NotificationMessage) => Promise<void>)
: (StateCarryingMessageQueues only) Append the matching aggregate (with correct version) to aNotificationMessage
and turn it into aStateCarryingMessage
before publishing it to the message queue. Uses the message queue event stores: Make sure that they have correct adapters set up.
Type Helpers:
MessageChannelMessage
: Given aMessageQueue
, returns the TS type of its messages
import type { MessageChannelMessage } from '@castore/core';
type AppMessage = MessageChannelMessage<typeof appMessageQueue>;
// 👇 Equivalent to:
type AppMessage = EventStoreNotificationMessage<
typeof pokemonsEventStore | typeof trainersEventStore...
>;