@wataruoguchi/emmett-event-store-kysely
A Kysely-based event store implementation for Emmett, providing event sourcing capabilities with PostgreSQL.
Overview
@wataruoguchi/emmett-event-store-kysely is a production-ready event store implementation that enables you to build event-sourced applications with PostgreSQL and TypeScript. It provides full compatibility with the Emmett framework while adding powerful features like snapshot projections, event consumers, and multi-tenancy support.
Key Features
- Full Event Sourcing - Complete implementation of Emmett's event store interface
- Snapshot Projections - This package's recommended approach for building read models that reuse your write model logic
- Event Consumers - Continuous background event processing with checkpoint tracking
- Multi-Tenancy - Built-in partition support for tenant isolation
- Type Safety - Full TypeScript support with discriminated unions and type inference
- PostgreSQL Optimized - Efficient queries and indexing for high-performance event storage
Architecture
The package provides:
- Event Store - Core event sourcing functionality (
getKyselyEventStore) - Snapshot Projections - Reuse your domain's
evolvefunction for read models - Projection Runner - On-demand projection execution (useful for testing and production workflows)
- Event Consumer - Asynchronous background processing for production
Getting Started
Installation
npm install @wataruoguchi/emmett-event-store-kysely @event-driven-io/emmett kysely pgDatabase Setup
First, set up the required PostgreSQL tables. The event store requires three tables:
messages- Stores individual eventsstreams- Tracks stream metadata and positionssubscriptions- Manages consumer checkpoints
See the migration example for the complete schema.
Legacy approach: A read model table should have these columns:
stream_id(TEXT/VARCHAR)last_stream_position(BIGINT)last_global_position(BIGINT)partition(TEXT)snapshot(JSONB) - Your aggregate state
New approach (recommended): Use createSnapshotProjectionWithSnapshotTable to store snapshots in a separate centralized snapshots table, keeping read model tables clean with only keys and denormalized columns.
Create Event Store
import { getKyselyEventStore } from "@wataruoguchi/emmett-event-store-kysely";
import { Kysely, PostgresDialect } from "kysely";
import { Pool } from "pg";
const db = new Kysely({
dialect: new PostgresDialect({
pool: new Pool({ connectionString: process.env.DATABASE_URL }),
}),
});
const eventStore = getKyselyEventStore({
db,
logger: console,
});Write Events with Command Handlers
You typically use Emmett's DeciderCommandHandler to create an event handler that wraps the event store and provides domain-specific methods:
import { DeciderCommandHandler } from "@event-driven-io/emmett";
import type { KyselyEventStore } from "@wataruoguchi/emmett-event-store-kysely";
// Define your domain logic
function createDecide() {
return (command: CreateCart, state: CartState): CartCreated => {
// Business logic validation
if (state.status !== "init") {
throw new Error("Cart already exists");
}
// Return event(s)
return {
type: "CartCreated",
data: {
eventData: { currency: command.data.currency },
eventMeta: {
tenantId: command.data.tenantId,
cartId: command.data.cartId,
createdBy: "user-123",
version: 1,
},
},
};
};
}
function createEvolve() {
return (state: CartState, event: CartEvent): CartState => {
switch (event.type) {
case "CartCreated":
return {
status: "active",
tenantId: event.data.eventMeta.tenantId,
cartId: event.data.eventMeta.cartId,
currency: event.data.eventData.currency,
items: [],
};
// ... other event handlers
}
};
}
// Create an event handler for your domain
export function cartEventHandler({
eventStore,
}: {
eventStore: KyselyEventStore;
}) {
const handler = DeciderCommandHandler({
decide: createDecide(),
evolve: createEvolve(),
initialState: () => ({ status: "init", items: [] }),
});
return {
create: (cartId: string, data: { tenantId: string; currency: string }) =>
handler(
eventStore,
cartId,
{ type: "CreateCart", data },
{ partition: data.tenantId, streamType: "cart" },
),
// ... other domain methods
};
}
// Usage
const eventStore = getKyselyEventStore({ db, logger });
const cartHandler = cartEventHandler({ eventStore });
await cartHandler.create("cart-123", {
tenantId: "tenant-456",
currency: "USD",
});Reading from Read Models
For reading data, you query your read models (projections) through repositories, not directly from the event store:
// Read from the projected read model table
const cart = await db
.selectFrom("carts")
.selectAll()
.where("cart_id", "=", "cart-123")
.where("tenant_id", "=", "tenant-456")
.executeTakeFirst();
// Access full state from snapshot
const state = cart.snapshot as CartState;Build Read Models with Snapshot Projections
This package recommends using snapshot projections, which reuse your domain's evolve function to ensure consistency between write and read models. There are two approaches:
Option A: Separate Snapshot Table (Recommended) ⭐
Use createSnapshotProjectionRegistryWithSnapshotTable to store snapshots in a centralized table:
import {
createSnapshotProjectionRegistryWithSnapshotTable
} from "@wataruoguchi/emmett-event-store-kysely";
// First, create the snapshots table:
// CREATE TABLE snapshots (
// readmodel_table_name TEXT NOT NULL,
// stream_id TEXT NOT NULL,
// last_stream_position BIGINT NOT NULL,
// last_global_position BIGINT NOT NULL,
// snapshot JSONB NOT NULL,
// PRIMARY KEY (readmodel_table_name, stream_id)
// );
// Reuse your write model's evolve function!
const registry = createSnapshotProjectionRegistryWithSnapshotTable(
["CartCreated", "ItemAdded", "CartCheckedOut"],
{
tableName: "carts",
extractKeys: (event, partition) => ({
tenant_id: event.data.eventMeta.tenantId,
cart_id: event.data.eventMeta.cartId,
partition,
}),
evolve: domainEvolve, // Same function as write model!
initialState: () => ({ status: "init", items: [] }),
mapToColumns: (state) => ({ // Optional: denormalize for queries
currency: state.status !== "init" ? state.currency : null,
total: state.status === "checkedOut" ? state.total : null,
}),
}
);Benefits:
- ✅ Cleaner read model tables (no event-sourcing columns needed)
- ✅ Easier to create new read models (no schema migrations for event-sourcing columns)
- ✅ Centralized snapshot management
Read model table schema:
CREATE TABLE carts (
tenant_id VARCHAR(100) NOT NULL,
cart_id VARCHAR(100) NOT NULL,
partition VARCHAR(100) NOT NULL,
-- Optional: Denormalized columns from mapToColumns
currency VARCHAR(3),
total NUMERIC(10, 2),
PRIMARY KEY (tenant_id, cart_id, partition)
);Option B: Legacy Approach (Backward Compatible)
Use createSnapshotProjectionRegistry to store everything in the read model table:
import {
createSnapshotProjectionRegistry
} from "@wataruoguchi/emmett-event-store-kysely";
// Reuse your write model's evolve function!
const registry = createSnapshotProjectionRegistry(
["CartCreated", "ItemAdded", "CartCheckedOut"],
{
tableName: "carts",
extractKeys: (event, partition) => ({
tenant_id: event.data.eventMeta.tenantId,
cart_id: event.data.eventMeta.cartId,
partition,
}),
evolve: domainEvolve, // Same function as write model!
initialState: () => ({ status: "init", items: [] }),
mapToColumns: (state) => ({ // Optional: denormalize for queries
currency: state.status !== "init" ? state.currency : null,
total: state.status === "checkedOut" ? state.total : null,
}),
}
);Read model table schema:
CREATE TABLE carts (
tenant_id VARCHAR(100) NOT NULL,
cart_id VARCHAR(100) NOT NULL,
partition VARCHAR(100) NOT NULL,
-- Required: Complete state
snapshot JSONB NOT NULL,
-- Required: Tracking
stream_id VARCHAR(255) NOT NULL,
last_stream_position BIGINT NOT NULL,
last_global_position BIGINT NOT NULL,
-- Optional: Denormalized columns from mapToColumns
currency VARCHAR(3),
total NUMERIC(10, 2),
PRIMARY KEY (tenant_id, cart_id, partition)
);Arguments (both approaches):
- First argument: Array of event types to handle
- Second argument: Configuration object
tableName: Database table name for the projectionextractKeys: Function that returns primary key values from the event (keys are inferred automatically)evolve: Your domain's evolve function (reuse from write model)initialState: Function that returns the initial aggregate statemapToColumns(optional): Function to denormalize state fields into table columns for querying
Process Events (On-Demand)
For on-demand processing (tests, backfills, or scheduled jobs), use the projection runner:
import { createProjectionRunner } from "@wataruoguchi/emmett-event-store-kysely";
const runner = createProjectionRunner({
db,
readStream: eventStore.readStream,
registry,
});
await runner.projectEvents("subscription-id", "cart-123", {
partition: "tenant-456"
});Note: The projection runner executes asynchronously when called (not blocking), processing events immediately on-demand. This makes it suitable for:
- Tests - Fast, deterministic execution
- Production Workers - Scheduled jobs or background workers (see example worker below)
- Backfills - Reprocessing historical events
- Manual Triggers - On-demand reprocessing
Production Worker Example
The projection runner can be used in production workers for scheduled or continuous processing:
// In a worker process
const runner = createProjectionRunner({ db, readStream, registry });
// Process all streams in a partition
while (true) {
const streams = await db
.selectFrom("streams")
.select(["stream_id"])
.where("partition", "=", partition)
.limit(50)
.execute();
for (const stream of streams) {
await runner.projectEvents("worker-subscription", stream.stream_id, {
partition,
batchSize: 200,
});
}
await new Promise((resolve) => setTimeout(resolve, 1000)); // Poll interval
}Process Events (Continuous Background Processing)
For continuous, automatic background processing, use the event consumer:
import { createKyselyEventStoreConsumer } from "@wataruoguchi/emmett-event-store-kysely";
const consumer = createKyselyEventStoreConsumer({
db,
logger,
consumerName: "carts-read-model",
batchSize: 100,
pollingInterval: 1000, // Poll every 1 second
});
// Subscribe to events
consumer.subscribe(async (event) => {
// Process event
await processEvent({ db, partition: event.metadata.partition }, event);
}, "CartCreated");
await consumer.start();API Reference
Event Store
getKyselyEventStore(deps: Dependencies): KyselyEventStore
Creates a new event store instance.
Parameters:
interface Dependencies {
db: DatabaseExecutor; // Kysely database instance
logger?: Logger; // Optional logger
inTransaction?: boolean;
}Returns: KyselyEventStore - Event store instance
Example:
const eventStore = getKyselyEventStore({ db, logger });Event Store Methods
The event store implements Emmett's EventStore interface. Typically, you don't call these methods directly—instead, you use DeciderCommandHandler from Emmett which internally uses these methods:
appendToStream()- Appends events to a stream (used internally by DeciderCommandHandler)readStream()- Reads events from a stream (used internally for state reconstruction)aggregateStream()- Rebuilds aggregate state from events (used internally by DeciderCommandHandler)
For most use cases, you'll work with command handlers rather than calling these methods directly. However, they're available if you need lower-level control.
aggregateStream(streamName: string, options): Promise<AggregateStreamResult>
Rebuilds aggregate state from events. This is typically used internally by DeciderCommandHandler.
const result = await eventStore.aggregateStream("cart-123", {
partition: "tenant-456",
evolve: (state, event) => { /* ... */ },
getInitialState: () => ({ status: "init" }),
});Snapshot Projections
createSnapshotProjectionRegistryWithSnapshotTable(eventTypes, config) ⭐ Recommended
Creates a projection registry for snapshot-based read models using a separate centralized snapshots table.
const registry = createSnapshotProjectionRegistryWithSnapshotTable(
["CartCreated", "ItemAdded"],
{
tableName: "carts",
extractKeys: (event, partition) => ({ /* ... */ }),
evolve: domainEvolve,
initialState: () => ({ /* ... */ }),
mapToColumns: (state) => ({ /* ... */ }), // Optional
}
);Benefits:
- Cleaner read model tables (no event-sourcing columns)
- Easier to create new read models
- Centralized snapshot management
- Deterministic
stream_idconstruction from keys (URL-encoded for safety)
Database schema required:
CREATE TABLE snapshots (
readmodel_table_name TEXT NOT NULL,
stream_id TEXT NOT NULL,
last_stream_position BIGINT NOT NULL,
last_global_position BIGINT NOT NULL,
snapshot JSONB NOT NULL,
PRIMARY KEY (readmodel_table_name, stream_id)
);Important Notes:
- Primary Key Consistency: The
extractKeysfunction must return the same set of keys for all events. The projection validates this at runtime and will throw an error if keys are inconsistent. - Idempotency: Events with
streamPosition <= lastProcessedPositionare automatically skipped, ensuring idempotent processing. - Race Condition Protection: Runs each projection update inside a transaction opened by the projection runner and uses
FOR UPDATErow-level locking within that transaction to prevent concurrent conflicts. - Snapshot Format: Handles both string and parsed JSON snapshot formats (different database drivers return JSONB differently).
- Special Characters: Keys with special characters (like
|or:) are safely URL-encoded in thestream_idconstruction.
createSnapshotProjectionRegistry(eventTypes, config) (Legacy)
Creates a projection registry for snapshot-based read models (legacy approach - stores everything in the read model table).
const registry = createSnapshotProjectionRegistry(
["CartCreated", "ItemAdded"],
{
tableName: "carts",
extractKeys: (event, partition) => ({ /* ... */ }),
evolve: domainEvolve,
initialState: () => ({ /* ... */ }),
mapToColumns: (state) => ({ /* ... */ }), // Optional
}
);Important Notes:
- Primary Key Consistency: The
extractKeysfunction must return the same set of keys for all events. The projection validates this at runtime and will throw an error if keys are inconsistent. - Idempotency: Events with
streamPosition <= lastProcessedPositionare automatically skipped, ensuring idempotent processing. - Race Condition Protection: Uses
FOR UPDATErow-level locking to prevent concurrent transaction conflicts. - Snapshot Format: Handles both string and parsed JSON snapshot formats (different database drivers return JSONB differently).
- Transaction Safety: All operations run within a transaction to ensure atomicity.
Projection Runner
createProjectionRunner(deps): ProjectionRunner
Creates a projection runner for on-demand event processing. The runner executes immediately when called and processes events in batches, making it suitable for tests, backfills, or scheduled production jobs.
const runner = createProjectionRunner({
db,
readStream: eventStore.readStream,
registry,
});
await runner.projectEvents("subscription-id", "stream-id", {
partition: "tenant-id",
});Event Consumer
createKyselyEventStoreConsumer(config): KyselyEventStoreConsumer
Creates an event consumer that automatically polls and processes events by global position across all streams. Ideal for continuous, hands-off background processing.
const consumer = createKyselyEventStoreConsumer({
db,
logger,
consumerName: "my-consumer",
batchSize: 100,
pollingInterval: 1000,
});
consumer.subscribe(async (event) => {
// Handle event
}, "EventType");
await consumer.start();
await consumer.stop(); // Graceful shutdownUseful Links
Event Sourcing
- Emmett Documentation - Official Emmett framework documentation
- Event-Driven Architecture - Event sourcing patterns and best practices
- Event Sourcing Explained - Martin Fowler's explanation of event sourcing
Read Models & Projections
- CQRS Pattern - Command Query Responsibility Segregation
- Guide to Projections and Read Models in Event-Driven Architecture - Understanding projections
- Snapshot Projections Guide - Detailed guide to snapshot projections
PostgreSQL & Kysely
- Kysely Documentation - Type-safe SQL query builder
- PostgreSQL Documentation - Official PostgreSQL docs
- PostgreSQL Partitioning - Multi-tenancy with partitioning
TypeScript
- Discriminated Unions - Type-safe event patterns
Testing
- Vitest Documentation - Fast unit test framework
- Testing Projections Guide - Best practices for testing projections