📙 Event Store
Once you've defined your event types and how to aggregate them, you can bundle them together in an EventStore
class. Each event store in your application represents a business entity.
Think of event stores as "what tables would be in CRUD", except that instead of directly updating data, you just append new events to it!
In Castore, EventStore
classes are NOT responsible for actually storing data (this will come with event storage adapters). But rather to provide a boilerplate-free and type-safe interface to perform many actions such as:
- Listing aggregate ids
- Accessing events of an aggregate
- Building an aggregate with the reducer
- Pushing new events etc.
import { EventStore } from '@castore/core';
const pokemonsEventStore = new EventStore({
eventStoreId: 'POKEMONS',
eventTypes: [
pokemonAppearedEventType,
pokemonCaughtEventType,
pokemonLeveledUpEventType,
...
],
reducer: pokemonsReducer,
});
// ...and that's it 🥳
☝️ The EventStore
class is the heart of Castore, it even gave it its name!
🔧 Reference
Constructor:
eventStoreId (string)
: A string identifying the event storeeventTypes (EventType[])
: The list of event types in the event storereduce (EventType[])
: A reducer function that can be applied to the store event typesonEventPushed (?(pushEventResponse: PushEventResponse) => Promise<void>)
: To run a callback after events are pushed (input is exactly the return value of thepushEvent
method)eventStorageAdapter (?EventStorageAdapter)
: See fetching events
☝️ The return type of the
reducer
is used to infer theAggregate
type of theEventStore
, so it is important to type it explicitely.
Properties:
eventStoreId (string)
const pokemonsEventStoreId = pokemonsEventStore.eventStoreId;
// => 'POKEMONS'
eventTypes (EventType[])
const pokemonsEventTypes = pokemonsEventStore.eventTypes;
// => [pokemonAppearedEventType, pokemonCaughtEventType...]
reduce ((Aggregate, EventType) => Aggregate)
const reducer = pokemonsEventStore.reduce;
// => pokemonsReducer
onEventPushed (?(pushEventResponse: PushEventResponse) => Promise<void>)
: Callback to run after events are pushed
const onEventPushed = pokemonsEventStore.onEventPushed;
// => undefined (we did not provide one in this example)
eventStorageAdapter (?EventStorageAdapter)
: See fetching events
const eventStorageAdapter = pokemonsEventStore.eventStorageAdapter;
// => undefined (we did not provide one in this example)
☝️ The
eventStorageAdapter
is not read-only so you do not have to provide it right away.
Sync Methods:
getEventStorageAdapter (() => EventStorageAdapter)
: Returns the event store event storage adapter if it exists. Throws anUndefinedEventStorageAdapterError
if it doesn't.
import { UndefinedEventStorageAdapterError } from '@castore/core';
expect(() => pokemonsEventStore.getEventStorageAdapter()).toThrow(
new UndefinedEventStorageAdapterError({ eventStoreId: 'POKEMONS' }),
);
// => true
buildAggregate ((eventDetails: EventDetail[], initialAggregate?: Aggregate) => Aggregate | undefined)
: Applies the event store reducer to a serie of events.
const myPikachuAggregate = pokemonsEventStore.buildAggregate(myPikachuEvents);
groupEvent ((eventDetail: EventDetail, opt?: OptionsObj) => GroupedEvent)
: See joining data.
Async Methods:
The following methods interact with the data layer of your event store through its EventStorageAdapter
. They will throw an UndefinedEventStorageAdapterError
if you did not provide one.
getEvents ((aggregateId: string, opt?: OptionsObj) => Promise<ResponseObj>)
: Retrieves the events of an aggregate, ordered byversion
. Returns an empty array if no event is found for thisaggregateId
.OptionsObj
contains the following properties:minVersion (?number)
: To retrieve events above a certain versionmaxVersion (?number)
: To retrieve events below a certain versionlimit (?number)
: Maximum number of events to retrievereverse (?boolean = false)
: To retrieve events in reverse order (does not require to swapminVersion
andmaxVersion
)
ResponseObj
contains the following properties:events (EventDetail[])
: The aggregate events (possibly empty)
const { events: allEvents } = await pokemonsEventStore.getEvents(myPikachuId);
// => typed as PokemonEventDetail[] 🙌
// 👇 Retrieve a range of events
const { events: rangedEvents } = await pokemonsEventStore.getEvents(
myPikachuId,
{
minVersion: 2,
maxVersion: 5,
},
);
// 👇 Retrieve the last event of the aggregate
const { events: onlyLastEvent } = await pokemonsEventStore.getEvents(
myPikachuId,
{
reverse: true,
limit: 1,
},
);
getAggregate ((aggregateId: string, opt?: OptionsObj) => Promise<ResponseObj>)
: Retrieves the events of an aggregate and build it.OptionsObj
contains the following properties:maxVersion (?number)
: To retrieve aggregate below a certain version
ResponseObj
contains the following properties:aggregate (?Aggregate)
: The aggregate (possiblyundefined
)events (EventDetail[])
: The aggregate events (possibly empty)lastEvent (?EventDetail)
: The last event (possiblyundefined
)
const { aggregate: myPikachu } = await pokemonsEventStore.getAggregate(
myPikachuId,
);
// => typed as PokemonAggregate | undefined 🙌
// 👇 Retrieve an aggregate below a certain version
const { aggregate: pikachuBelowVersion5 } =
await pokemonsEventStore.getAggregate(myPikachuId, { maxVersion: 5 });
// 👇 Returns the events if you need them
const { aggregate, events } = await pokemonsEventStore.getAggregate(
myPikachuId,
);
getExistingAggregate ((aggregateId: string, opt?: OptionsObj) => Promise<ResponseObj>)
: Same asgetAggregate
method, but ensures that the aggregate exists. Throws anAggregateNotFoundError
if no event is found for thisaggregateId
.
import { AggregateNotFoundError } from '@castore/core';
expect(async () =>
pokemonsEventStore.getExistingAggregate(unexistingId),
).resolves.toThrow(
new AggregateNotFoundError({
eventStoreId: 'POKEMONS',
aggregateId: unexistingId,
}),
);
// true
const { aggregate } = await pokemonsEventStore.getExistingAggregate(
aggregateId,
);
// => 'aggregate' and 'lastEvent' are always defined 🙌
pushEvent ((eventDetail: EventDetail, opt?: OptionsObj) => Promise<ResponseObj>)
: Pushes a new event to the event store. Thetimestamp
is optional (we keep it available as it can be useful in tests & migrations). If not provided, it is automatically set asnew Date().toISOString()
. Throws anEventAlreadyExistsError
if an event already exists for the correspondingaggregateId
andversion
(see section on race conditions).OptionsObj
contains the following properties:prevAggregate (?Aggregate)
: The aggregate at the current version, i.e. before having pushed the event. Can be useful in some cases like when using the ConnectedEventStore classforce (?boolean)
: To force push the event even if one already exists for the correspondingaggregateId
andversion
. Any existing event will be overridden, so use with extra care, mainly in data migrations.
ResponseObj
contains the following properties:event (EventDetail)
: The complete event (includes thetimestamp
)nextAggregate (?Aggregate)
: The aggregate at the new version, i.e. after having pushed the event. Returned only if the event is an initial event, if theprevAggregate
option was provided, or when using a ConnectedEventStore class connected to a state-carrying message bus or queue
const { event: completeEvent, nextAggregate } =
await pokemonsEventStore.pushEvent(
{
aggregateId: myPikachuId,
version: lastVersion + 1,
type: 'POKEMON_LEVELED_UP', // <= event type is correctly typed 🙌
payload, // <= payload is typed according to the provided event type 🙌
metadata, // <= same goes for metadata 🙌
// timestamp is optional
},
// Not required - Can be useful in some cases
{ prevAggregate },
);
listAggregateIds ((opt?: OptionsObj) => Promise<ResponseObj>)
: Retrieves the list ofaggregateId
of an event store, ordered by thetimestamp
of their initial event. Returns an empty array if no aggregate is found.OptionsObj
contains the following properties:limit (?number)
: Maximum number of aggregate ids to retrieveinitialEventAfter (?string)
: To retrieve aggregate ids that appeared after a certain timestampinitialEventBefore (?string)
: To retrieve aggregate ids that appeared before a certain timestampreverse (?boolean)
: To retrieve the aggregate ids in reverse orderpageToken (?string)
: To retrieve a paginated result of aggregate ids
ResponseObj
contains the following properties:aggregateIds (string[])
: The list of aggregate idsnextPageToken (?string)
: A token for the next page of aggregate ids if one exists. The nextPageToken carries the previously used options, so you do not have to provide them again (though you can still do it to override them).
const accAggregateIds: string = [];
const { aggregateIds: firstPage, nextPageToken } =
await pokemonsEventStore.listAggregateIds({ limit: 20 });
accAggregateIds.push(...firstPage);
if (nextPageToken) {
const { aggregateIds: secondPage } =
await pokemonsEventStore.listAggregateIds({
// 👇 Previous limit of 20 is passed through the page token
pageToken: nextPageToken,
});
accAggregateIds.push(...secondPage);
}
Type Helpers:
EventStoreId
: Returns theEventStore
id
import type { EventStoreId } from '@castore/core';
type PokemonsEventStoreId = EventStoreId<typeof pokemonsEventStore>;
// => 'POKEMONS'
EventStoreEventTypes
: Returns theEventStore
list of events types
import type { EventStoreEventTypes } from '@castore/core';
type PokemonEventTypes = EventStoreEventTypes<typeof pokemonsEventStore>;
// => [typeof pokemonAppearedEventType, typeof pokemonCaughtEventType...]
EventStoreEventDetails
: Returns the union of all theEventStore
possible events details
import type { EventStoreEventDetails } from '@castore/core';
type PokemonEventDetails = EventStoreEventDetails<typeof pokemonsEventStore>;
// => EventTypeDetail<typeof pokemonAppearedEventType>
// | EventTypeDetail<typeof pokemonCaughtEventType>
// | ...
EventStoreReducer
: Returns theEventStore
reducer
import type { EventStoreReducer } from '@castore/core';
type PokemonsReducer = EventStoreReducer<typeof pokemonsEventStore>;
// => Reducer<PokemonAggregate, PokemonEventDetails>
EventStoreAggregate
: Returns theEventStore
aggregate
import type { EventStoreAggregate } from '@castore/core';
type SomeAggregate = EventStoreAggregate<typeof pokemonsEventStore>;
// => PokemonAggregate