📙 Event Store
Once you've defined your event types and how to aggregate them, you can bundle them together in an EventStore class. Each event store in your application represents a business entity.
Think of event stores as "what tables would be in CRUD", except that instead of directly updating data, you just append new events to it!

In Castore, EventStore classes are NOT responsible for actually storing data (this will come with event storage adapters). But rather to provide a boilerplate-free and type-safe interface to perform many actions such as:
- Listing aggregate ids
- Accessing events of an aggregate
- Building an aggregate with the reducer
- Pushing new events etc.
import { EventStore } from '@castore/core';
const pokemonsEventStore = new EventStore({
eventStoreId: 'POKEMONS',
eventTypes: [
pokemonAppearedEventType,
pokemonCaughtEventType,
pokemonLeveledUpEventType,
...
],
reducer: pokemonsReducer,
});
// ...and that's it 🥳
☝️ The EventStore class is the heart of Castore, it even gave it its name!
🔧 Reference
Constructor:
eventStoreId (string): A string identifying the event storeeventTypes (EventType[]): The list of event types in the event storereduce (EventType[]): A reducer function that can be applied to the store event typesonEventPushed (?(pushEventResponse: PushEventResponse) => Promise<void>): To run a callback after events are pushed (input is exactly the return value of thepushEventmethod)eventStorageAdapter (?EventStorageAdapter): See fetching events
☝️ The return type of the
reduceris used to infer theAggregatetype of theEventStore, so it is important to type it explicitely.
Properties:
-
eventStoreId (string)
const pokemonsEventStoreId = pokemonsEventStore.eventStoreId;
// => 'POKEMONS'
-
eventTypes (EventType[])
const pokemonsEventTypes = pokemonsEventStore.eventTypes;
// => [pokemonAppearedEventType, pokemonCaughtEventType...]
-
reduce ((Aggregate, EventType) => Aggregate)
const reducer = pokemonsEventStore.reduce;
// => pokemonsReducer
onEventPushed (?(pushEventResponse: PushEventResponse) => Promise<void>): Callback to run after events are pushed
const onEventPushed = pokemonsEventStore.onEventPushed;
// => undefined (we did not provide one in this example)
eventStorageAdapter (?EventStorageAdapter): See fetching events
const eventStorageAdapter = pokemonsEventStore.eventStorageAdapter;
// => undefined (we did not provide one in this example)
☝️ The
eventStorageAdapteris not read-only so you do not have to provide it right away.
Sync Methods:
getEventStorageAdapter (() => EventStorageAdapter): Returns the event store event storage adapter if it exists. Throws anUndefinedEventStorageAdapterErrorif it doesn't.
import { UndefinedEventStorageAdapterError } from '@castore/core';
expect(() => pokemonsEventStore.getEventStorageAdapter()).toThrow(
new UndefinedEventStorageAdapterError({ eventStoreId: 'POKEMONS' }),
);
// => true
buildAggregate ((eventDetails: EventDetail[], initialAggregate?: Aggregate) => Aggregate | undefined): Applies the event store reducer to a serie of events.
const myPikachuAggregate = pokemonsEventStore.buildAggregate(myPikachuEvents);
groupEvent ((eventDetail: EventDetail, opt?: OptionsObj) => GroupedEvent): See joining data.
Async Methods:
The following methods interact with the data layer of your event store through its EventStorageAdapter. They will throw an UndefinedEventStorageAdapterError if you did not provide one.
-
getEvents ((aggregateId: string, opt?: OptionsObj) => Promise<ResponseObj>): Retrieves the events of an aggregate, ordered byversion. Returns an empty array if no event is found for thisaggregateId.OptionsObjcontains the following properties:minVersion (?number): To retrieve events above a certain versionmaxVersion (?number): To retrieve events below a certain versionlimit (?number): Maximum number of events to retrievereverse (?boolean = false): To retrieve events in reverse order (does not require to swapminVersionandmaxVersion)
ResponseObjcontains the following properties:events (EventDetail[]): The aggregate events (possibly empty)
const { events: allEvents } = await pokemonsEventStore.getEvents(myPikachuId);
// => typed as PokemonEventDetail[] 🙌
// 👇 Retrieve a range of events
const { events: rangedEvents } = await pokemonsEventStore.getEvents(
myPikachuId,
{
minVersion: 2,
maxVersion: 5,
},
);
// 👇 Retrieve the last event of the aggregate
const { events: onlyLastEvent } = await pokemonsEventStore.getEvents(
myPikachuId,
{
reverse: true,
limit: 1,
},
);
-
getAggregate ((aggregateId: string, opt?: OptionsObj) => Promise<ResponseObj>): Retrieves the events of an aggregate and build it.OptionsObjcontains the following properties:maxVersion (?number): To retrieve aggregate below a certain version
ResponseObjcontains the following properties:aggregate (?Aggregate): The aggregate (possiblyundefined)events (EventDetail[]): The aggregate events (possibly empty)lastEvent (?EventDetail): The last event (possiblyundefined)
const { aggregate: myPikachu } =
await pokemonsEventStore.getAggregate(myPikachuId);
// => typed as PokemonAggregate | undefined 🙌
// 👇 Retrieve an aggregate below a certain version
const { aggregate: pikachuBelowVersion5 } =
await pokemonsEventStore.getAggregate(myPikachuId, { maxVersion: 5 });
// 👇 Returns the events if you need them
const { aggregate, events } =
await pokemonsEventStore.getAggregate(myPikachuId);
getExistingAggregate ((aggregateId: string, opt?: OptionsObj) => Promise<ResponseObj>): Same asgetAggregatemethod, but ensures that the aggregate exists. Throws anAggregateNotFoundErrorif no event is found for thisaggregateId.
import { AggregateNotFoundError } from '@castore/core';
expect(async () =>
pokemonsEventStore.getExistingAggregate(unexistingId),
).resolves.toThrow(
new AggregateNotFoundError({
eventStoreId: 'POKEMONS',
aggregateId: unexistingId,
}),
);
// true
const { aggregate } =
await pokemonsEventStore.getExistingAggregate(aggregateId);
// => 'aggregate' and 'lastEvent' are always defined 🙌
-
pushEvent ((eventDetail: EventDetail, opt?: OptionsObj) => Promise<ResponseObj>): Pushes a new event to the event store. Thetimestampis optional (we keep it available as it can be useful in tests & migrations). If not provided, it is automatically set asnew Date().toISOString(). Throws anEventAlreadyExistsErrorif an event already exists for the correspondingaggregateIdandversion(see section on race conditions).OptionsObjcontains the following properties:prevAggregate (?Aggregate): The aggregate at the current version, i.e. before having pushed the event. Can be useful in some cases like when using the ConnectedEventStore classforce (?boolean): To force push the event even if one already exists for the correspondingaggregateIdandversion. Any existing event will be overridden, so use with extra care, mainly in data migrations.
ResponseObjcontains the following properties:event (EventDetail): The complete event (includes thetimestamp)nextAggregate (?Aggregate): The aggregate at the new version, i.e. after having pushed the event. Returned only if the event is an initial event, if theprevAggregateoption was provided, or when using a ConnectedEventStore class connected to a state-carrying message bus or queue
const { event: completeEvent, nextAggregate } =
await pokemonsEventStore.pushEvent(
{
aggregateId: myPikachuId,
version: lastVersion + 1,
type: 'POKEMON_LEVELED_UP', // <= event type is correctly typed 🙌
payload, // <= payload is typed according to the provided event type 🙌
metadata, // <= same goes for metadata 🙌
// timestamp is optional
},
// Not required - Can be useful in some cases
{ prevAggregate },
);
-
listAggregateIds ((opt?: OptionsObj) => Promise<ResponseObj>): Retrieves the list ofaggregateIdof an event store, ordered by thetimestampof their initial event. Returns an empty array if no aggregate is found.OptionsObjcontains the following properties:limit (?number): Maximum number of aggregate ids to retrieveinitialEventAfter (?string): To retrieve aggregate ids that appeared after a certain timestampinitialEventBefore (?string): To retrieve aggregate ids that appeared before a certain timestampreverse (?boolean): To retrieve the aggregate ids in reverse orderpageToken (?string): To retrieve a paginated result of aggregate ids
ResponseObjcontains the following properties:aggregateIds (string[]): The list of aggregate idsnextPageToken (?string): A token for the next page of aggregate ids if one exists. The nextPageToken carries the previously used options, so you do not have to provide them again (though you can still do it to override them).
const accAggregateIds: string = [];
const { aggregateIds: firstPage, nextPageToken } =
await pokemonsEventStore.listAggregateIds({ limit: 20 });
accAggregateIds.push(...firstPage);
if (nextPageToken) {
const { aggregateIds: secondPage } =
await pokemonsEventStore.listAggregateIds({
// 👇 Previous limit of 20 is passed through the page token
pageToken: nextPageToken,
});
accAggregateIds.push(...secondPage);
}
Type Helpers:
EventStoreId: Returns theEventStoreid
import type { EventStoreId } from '@castore/core';
type PokemonsEventStoreId = EventStoreId<typeof pokemonsEventStore>;
// => 'POKEMONS'
EventStoreEventTypes: Returns theEventStorelist of events types
import type { EventStoreEventTypes } from '@castore/core';
type PokemonEventTypes = EventStoreEventTypes<typeof pokemonsEventStore>;
// => [typeof pokemonAppearedEventType, typeof pokemonCaughtEventType...]
EventStoreEventDetails: Returns the union of all theEventStorepossible events details
import type { EventStoreEventDetails } from '@castore/core';
type PokemonEventDetails = EventStoreEventDetails<typeof pokemonsEventStore>;
// => EventTypeDetail<typeof pokemonAppearedEventType>
// | EventTypeDetail<typeof pokemonCaughtEventType>
// | ...
EventStoreReducer: Returns theEventStorereducer
import type { EventStoreReducer } from '@castore/core';
type PokemonsReducer = EventStoreReducer<typeof pokemonsEventStore>;
// => Reducer<PokemonAggregate, PokemonEventDetails>
EventStoreAggregate: Returns theEventStoreaggregate
import type { EventStoreAggregate } from '@castore/core';
type SomeAggregate = EventStoreAggregate<typeof pokemonsEventStore>;
// => PokemonAggregate