Skip to main content

🔌 Connected Event Store

If your storage solution exposes data streaming capabilities (such as DynamoDB streams), you can leverage them to push your freshly written events to a message bus or queue.

Otherwise, you can use the ConnectedEventStore class. Its interface matches the EventStore one, but successfully pushing a new event will automatically forward it to a message queue/bus. Successfully pushing an event group will also automatically forward the events to their respective message queues/buses:

import { ConnectedEventStore } from '@castore/core';

const connectedPokemonsEventStore = new ConnectedEventStore(
// 👇 Original event store
pokemonsEventStore,
// 👇 Type-safe (appMessageBus MUST be able to carry pokemon events)
appMessageBus,
);

// Will push the event in the event store
// ...AND publish it to the message bus if it succeeds 🙌
await connectedPokemonsEventStore.pushEvent({
aggregateId: pokemonId,
version: 2,
type: 'POKEMON_LEVELED_UP',
...
});
info

Note that setting a connected event store eventStorageAdapter and onEventPushed properties will override those of the original event store instead.

If the message bus or queue is a state-carrying one, the pushEvent method will re-fetch the aggregate to append it to the message before publishing it. You can reduce this overhead by providing the previous aggregate as an option:

await connectedPokemonsEventStore.pushEvent(
{
aggregateId: pokemonId,
version: 2,
...
},
// 👇 Aggregate at version 1
{ prevAggregate: pokemonAggregate },
// Removes the need to re-fetch 🙌
);

await EventStore.pushEventGroup(
connectedPokemonsEventStore.groupEvent(
{ ... },
// Will also work on event groups 🙌
{ prevAggregate: pokemonAggregate },
),
);

Compared to data streams, connected event stores have the advantage of simplicity, performances and costs. However, they strongly decouple your storage and messaging solutions: Make sure to anticipate any issue that might arise (consistency, non-caught errors etc.).

Connected Event Store

🔧 Reference

Constructor:

  • eventStore (EventStore): The event store to connect
  • messageChannel (MessageBus | MessageQueue): A message bus or queue to forward events to

Properties:

A ConnectedEventStore will implement the interface of its original EventStore, and extend it with two additional properties:

  • eventStore (EventStore): The original event store
const eventStore = connectedPokemonsEventStore.eventStore;
// => pokemonsEventStore
  • messageChannel (MessageBus | MessageQueue): The provided message bus or queue
const messageChannel = connectedPokemonsEventStore.messageChannel;
// => appMessageBus

☝️ Note that the eventStorageAdapter property will act as a pointer toward the original event store eventStorageAdapter:

originalEventStore.eventStorageAdapter = myEventStorageAdapter;
connectedEventStore.eventStorageAdapter; // => myEventStorageAdapter

connectedEventStore.eventStorageAdapter = anotherEventStorageAdapter;
originalEventStore.eventStorageAdapter; // => anotherEventStorageAdapter