Skip to main content

📙 Event Store

Once you've defined your event types and how to aggregate them, you can bundle them together in an EventStore class. Each event store in your application represents a business entity.

note

Think of event stores as "what tables would be in CRUD", except that instead of directly updating data, you just append new events to it!

Event Store

In Castore, EventStore classes are NOT responsible for actually storing data (this will come with event storage adapters). But rather to provide a boilerplate-free and type-safe interface to perform many actions such as:

  • Listing aggregate ids
  • Accessing events of an aggregate
  • Building an aggregate with the reducer
  • Pushing new events etc.
import { EventStore } from '@castore/core';

const pokemonsEventStore = new EventStore({
eventStoreId: 'POKEMONS',
eventTypes: [
pokemonAppearedEventType,
pokemonCaughtEventType,
pokemonLeveledUpEventType,
...
],
reducer: pokemonsReducer,
});
// ...and that's it 🥳
info

☝️ The EventStore class is the heart of Castore, it even gave it its name!

🔧 Reference

Constructor:

  • eventStoreId (string): A string identifying the event store
  • eventTypes (EventType[]): The list of event types in the event store
  • reduce (EventType[]): A reducer function that can be applied to the store event types
  • onEventPushed (?(pushEventResponse: PushEventResponse) => Promise<void>): To run a callback after events are pushed (input is exactly the return value of the pushEvent method)
  • eventStorageAdapter (?EventStorageAdapter): See fetching events

☝️ The return type of the reducer is used to infer the Aggregate type of the EventStore, so it is important to type it explicitely.


Properties:

  • eventStoreId (string)
const pokemonsEventStoreId = pokemonsEventStore.eventStoreId;
// => 'POKEMONS'
  • eventTypes (EventType[])
const pokemonsEventTypes = pokemonsEventStore.eventTypes;
// => [pokemonAppearedEventType, pokemonCaughtEventType...]
  • reduce ((Aggregate, EventType) => Aggregate)
const reducer = pokemonsEventStore.reduce;
// => pokemonsReducer
  • onEventPushed (?(pushEventResponse: PushEventResponse) => Promise<void>): Callback to run after events are pushed
const onEventPushed = pokemonsEventStore.onEventPushed;
// => undefined (we did not provide one in this example)
const eventStorageAdapter = pokemonsEventStore.eventStorageAdapter;
// => undefined (we did not provide one in this example)

☝️ The eventStorageAdapter is not read-only so you do not have to provide it right away.


Sync Methods:

  • getEventStorageAdapter (() => EventStorageAdapter): Returns the event store event storage adapter if it exists. Throws an UndefinedEventStorageAdapterError if it doesn't.
import { UndefinedEventStorageAdapterError } from '@castore/core';

expect(() => pokemonsEventStore.getEventStorageAdapter()).toThrow(
new UndefinedEventStorageAdapterError({ eventStoreId: 'POKEMONS' }),
);
// => true
  • buildAggregate ((eventDetails: EventDetail[], initialAggregate?: Aggregate) => Aggregate | undefined): Applies the event store reducer to a serie of events.
const myPikachuAggregate = pokemonsEventStore.buildAggregate(myPikachuEvents);
  • groupEvent ((eventDetail: EventDetail, opt?: OptionsObj) => GroupedEvent): See joining data.

Async Methods:

The following methods interact with the data layer of your event store through its EventStorageAdapter. They will throw an UndefinedEventStorageAdapterError if you did not provide one.

  • getEvents ((aggregateId: string, opt?: OptionsObj) => Promise<ResponseObj>): Retrieves the events of an aggregate, ordered by version. Returns an empty array if no event is found for this aggregateId.

    OptionsObj contains the following properties:

    • minVersion (?number): To retrieve events above a certain version
    • maxVersion (?number): To retrieve events below a certain version
    • limit (?number): Maximum number of events to retrieve
    • reverse (?boolean = false): To retrieve events in reverse order (does not require to swap minVersion and maxVersion)

    ResponseObj contains the following properties:

    • events (EventDetail[]): The aggregate events (possibly empty)
const { events: allEvents } = await pokemonsEventStore.getEvents(myPikachuId);
// => typed as PokemonEventDetail[] 🙌

// 👇 Retrieve a range of events
const { events: rangedEvents } = await pokemonsEventStore.getEvents(
myPikachuId,
{
minVersion: 2,
maxVersion: 5,
},
);

// 👇 Retrieve the last event of the aggregate
const { events: onlyLastEvent } = await pokemonsEventStore.getEvents(
myPikachuId,
{
reverse: true,
limit: 1,
},
);
  • getAggregate ((aggregateId: string, opt?: OptionsObj) => Promise<ResponseObj>): Retrieves the events of an aggregate and build it.

    OptionsObj contains the following properties:

    • maxVersion (?number): To retrieve aggregate below a certain version

    ResponseObj contains the following properties:

    • aggregate (?Aggregate): The aggregate (possibly undefined)
    • events (EventDetail[]): The aggregate events (possibly empty)
    • lastEvent (?EventDetail): The last event (possibly undefined)
const { aggregate: myPikachu } = await pokemonsEventStore.getAggregate(
myPikachuId,
);
// => typed as PokemonAggregate | undefined 🙌

// 👇 Retrieve an aggregate below a certain version
const { aggregate: pikachuBelowVersion5 } =
await pokemonsEventStore.getAggregate(myPikachuId, { maxVersion: 5 });

// 👇 Returns the events if you need them
const { aggregate, events } = await pokemonsEventStore.getAggregate(
myPikachuId,
);
  • getExistingAggregate ((aggregateId: string, opt?: OptionsObj) => Promise<ResponseObj>): Same as getAggregate method, but ensures that the aggregate exists. Throws an AggregateNotFoundError if no event is found for this aggregateId.
import { AggregateNotFoundError } from '@castore/core';

expect(async () =>
pokemonsEventStore.getExistingAggregate(unexistingId),
).resolves.toThrow(
new AggregateNotFoundError({
eventStoreId: 'POKEMONS',
aggregateId: unexistingId,
}),
);
// true

const { aggregate } = await pokemonsEventStore.getExistingAggregate(
aggregateId,
);
// => 'aggregate' and 'lastEvent' are always defined 🙌
  • pushEvent ((eventDetail: EventDetail, opt?: OptionsObj) => Promise<ResponseObj>): Pushes a new event to the event store. The timestamp is optional (we keep it available as it can be useful in tests & migrations). If not provided, it is automatically set as new Date().toISOString(). Throws an EventAlreadyExistsError if an event already exists for the corresponding aggregateId and version (see section on race conditions).

    OptionsObj contains the following properties:

    • prevAggregate (?Aggregate): The aggregate at the current version, i.e. before having pushed the event. Can be useful in some cases like when using the ConnectedEventStore class
    • force (?boolean): To force push the event even if one already exists for the corresponding aggregateId and version. Any existing event will be overridden, so use with extra care, mainly in data migrations.

    ResponseObj contains the following properties:

    • event (EventDetail): The complete event (includes the timestamp)
    • nextAggregate (?Aggregate): The aggregate at the new version, i.e. after having pushed the event. Returned only if the event is an initial event, if the prevAggregate option was provided, or when using a ConnectedEventStore class connected to a state-carrying message bus or queue
const { event: completeEvent, nextAggregate } =
await pokemonsEventStore.pushEvent(
{
aggregateId: myPikachuId,
version: lastVersion + 1,
type: 'POKEMON_LEVELED_UP', // <= event type is correctly typed 🙌
payload, // <= payload is typed according to the provided event type 🙌
metadata, // <= same goes for metadata 🙌
// timestamp is optional
},
// Not required - Can be useful in some cases
{ prevAggregate },
);
  • listAggregateIds ((opt?: OptionsObj) => Promise<ResponseObj>): Retrieves the list of aggregateId of an event store, ordered by the timestamp of their initial event. Returns an empty array if no aggregate is found.

    OptionsObj contains the following properties:

    • limit (?number): Maximum number of aggregate ids to retrieve
    • initialEventAfter (?string): To retrieve aggregate ids that appeared after a certain timestamp
    • initialEventBefore (?string): To retrieve aggregate ids that appeared before a certain timestamp
    • reverse (?boolean): To retrieve the aggregate ids in reverse order
    • pageToken (?string): To retrieve a paginated result of aggregate ids

    ResponseObj contains the following properties:

    • aggregateIds (string[]): The list of aggregate ids
    • nextPageToken (?string): A token for the next page of aggregate ids if one exists. The nextPageToken carries the previously used options, so you do not have to provide them again (though you can still do it to override them).
const accAggregateIds: string = [];

const { aggregateIds: firstPage, nextPageToken } =
await pokemonsEventStore.listAggregateIds({ limit: 20 });

accAggregateIds.push(...firstPage);

if (nextPageToken) {
const { aggregateIds: secondPage } =
await pokemonsEventStore.listAggregateIds({
// 👇 Previous limit of 20 is passed through the page token
pageToken: nextPageToken,
});
accAggregateIds.push(...secondPage);
}

Type Helpers:

  • EventStoreId: Returns the EventStore id
import type { EventStoreId } from '@castore/core';

type PokemonsEventStoreId = EventStoreId<typeof pokemonsEventStore>;
// => 'POKEMONS'
  • EventStoreEventTypes: Returns the EventStore list of events types
import type { EventStoreEventTypes } from '@castore/core';

type PokemonEventTypes = EventStoreEventTypes<typeof pokemonsEventStore>;
// => [typeof pokemonAppearedEventType, typeof pokemonCaughtEventType...]
  • EventStoreEventDetails: Returns the union of all the EventStore possible events details
import type { EventStoreEventDetails } from '@castore/core';

type PokemonEventDetails = EventStoreEventDetails<typeof pokemonsEventStore>;
// => EventTypeDetail<typeof pokemonAppearedEventType>
// | EventTypeDetail<typeof pokemonCaughtEventType>
// | ...
  • EventStoreReducer: Returns the EventStore reducer
import type { EventStoreReducer } from '@castore/core';

type PokemonsReducer = EventStoreReducer<typeof pokemonsEventStore>;
// => Reducer<PokemonAggregate, PokemonEventDetails>
  • EventStoreAggregate: Returns the EventStore aggregate
import type { EventStoreAggregate } from '@castore/core';

type SomeAggregate = EventStoreAggregate<typeof pokemonsEventStore>;
// => PokemonAggregate