Skip to content

@wataruoguchi/emmett-event-store-kysely

A Kysely-based event store implementation for Emmett, providing event sourcing capabilities with PostgreSQL.

Overview

@wataruoguchi/emmett-event-store-kysely is a production-ready event store implementation that enables you to build event-sourced applications with PostgreSQL and TypeScript. It provides full compatibility with the Emmett framework while adding powerful features like snapshot projections, event consumers, and multi-tenancy support.

Key Features

  • Full Event Sourcing - Complete implementation of Emmett's event store interface
  • Snapshot Projections - This package's recommended approach for building read models that reuse your write model logic
  • Event Consumers - Continuous background event processing with checkpoint tracking
  • Multi-Tenancy - Built-in partition support for tenant isolation
  • Type Safety - Full TypeScript support with discriminated unions and type inference
  • PostgreSQL Optimized - Efficient queries and indexing for high-performance event storage

Architecture

The package provides:

  • Event Store - Core event sourcing functionality (getKyselyEventStore)
  • Snapshot Projections - Reuse your domain's evolve function for read models
  • Projection Runner - On-demand projection execution (useful for testing and production workflows)
  • Event Consumer - Asynchronous background processing for production

Getting Started

Installation

bash
npm install @wataruoguchi/emmett-event-store-kysely @event-driven-io/emmett kysely pg

Database Setup

First, set up the required PostgreSQL tables. The event store requires three tables:

  • messages - Stores individual events
  • streams - Tracks stream metadata and positions
  • subscriptions - Manages consumer checkpoints

See the migration example for the complete schema.

A read model table should have these columns:

  • stream_id (TEXT/VARCHAR)
  • last_stream_position (BIGINT)
  • last_global_position (BIGINT)
  • partition (TEXT)
  • snapshot (JSONB) - Your aggregate state

Create Event Store

typescript
import { getKyselyEventStore } from "@wataruoguchi/emmett-event-store-kysely";
import { Kysely, PostgresDialect } from "kysely";
import { Pool } from "pg";

const db = new Kysely({
  dialect: new PostgresDialect({
    pool: new Pool({ connectionString: process.env.DATABASE_URL }),
  }),
});

const eventStore = getKyselyEventStore({ 
  db, 
  logger: console,
});

Write Events with Command Handlers

You typically use Emmett's DeciderCommandHandler to create an event handler that wraps the event store and provides domain-specific methods:

typescript
import { DeciderCommandHandler } from "@event-driven-io/emmett";
import type { KyselyEventStore } from "@wataruoguchi/emmett-event-store-kysely";

// Define your domain logic
function createDecide() {
  return (command: CreateCart, state: CartState): CartCreated => {
    // Business logic validation
    if (state.status !== "init") {
      throw new Error("Cart already exists");
    }
    // Return event(s)
    return {
      type: "CartCreated",
      data: {
        eventData: { currency: command.data.currency },
        eventMeta: {
          tenantId: command.data.tenantId,
          cartId: command.data.cartId,
          createdBy: "user-123",
          version: 1,
        },
      },
    };
  };
}

function createEvolve() {
  return (state: CartState, event: CartEvent): CartState => {
    switch (event.type) {
      case "CartCreated":
        return {
          status: "active",
          tenantId: event.data.eventMeta.tenantId,
          cartId: event.data.eventMeta.cartId,
          currency: event.data.eventData.currency,
          items: [],
        };
      // ... other event handlers
    }
  };
}

// Create an event handler for your domain
export function cartEventHandler({
  eventStore,
}: {
  eventStore: KyselyEventStore;
}) {
  const handler = DeciderCommandHandler({
    decide: createDecide(),
    evolve: createEvolve(),
    initialState: () => ({ status: "init", items: [] }),
  });

  return {
    create: (cartId: string, data: { tenantId: string; currency: string }) =>
      handler(
        eventStore,
        cartId,
        { type: "CreateCart", data },
        { partition: data.tenantId, streamType: "cart" },
      ),
    // ... other domain methods
  };
}

// Usage
const eventStore = getKyselyEventStore({ db, logger });
const cartHandler = cartEventHandler({ eventStore });

await cartHandler.create("cart-123", {
  tenantId: "tenant-456",
  currency: "USD",
});

Reading from Read Models

For reading data, you query your read models (projections) through repositories, not directly from the event store:

typescript
// Read from the projected read model table
const cart = await db
  .selectFrom("carts")
  .selectAll()
  .where("cart_id", "=", "cart-123")
  .where("tenant_id", "=", "tenant-456")
  .executeTakeFirst();

// Access full state from snapshot
const state = cart.snapshot as CartState;

Build Read Models with Snapshot Projections

This package recommends using snapshot projections, which reuse your domain's evolve function to ensure consistency between write and read models:

typescript
import { 
  createSnapshotProjectionRegistry 
} from "@wataruoguchi/emmett-event-store-kysely/projections";

// Reuse your write model's evolve function!
const registry = createSnapshotProjectionRegistry(
  ["CartCreated", "ItemAdded", "CartCheckedOut"],
  {
    tableName: "carts",
    primaryKeys: ["tenant_id", "cart_id", "partition"],
    extractKeys: (event, partition) => ({
      tenant_id: event.data.eventMeta.tenantId,
      cart_id: event.data.eventMeta.cartId,
      partition,
    }),
    evolve: domainEvolve,      // Same function as write model!
    initialState: () => ({ status: "init", items: [] }),
    mapToColumns: (state) => ({ // Optional: denormalize for queries
      currency: state.status !== "init" ? state.currency : null,
      total: state.status === "checkedOut" ? state.total : null,
    }),
  }
);

Process Events (On-Demand)

For on-demand processing (tests, backfills, or scheduled jobs), use the projection runner:

typescript
import { createProjectionRunner } from "@wataruoguchi/emmett-event-store-kysely/projections";

const runner = createProjectionRunner({ 
  db, 
  readStream: eventStore.readStream, 
  registry,
});

await runner.projectEvents("subscription-id", "cart-123", {
  partition: "tenant-456"
});

Note: The projection runner executes asynchronously when called (not blocking), processing events immediately on-demand. This makes it suitable for:

  • Tests - Fast, deterministic execution
  • Production Workers - Scheduled jobs or background workers (see example worker below)
  • Backfills - Reprocessing historical events
  • Manual Triggers - On-demand reprocessing

Production Worker Example

The projection runner can be used in production workers for scheduled or continuous processing:

typescript
// In a worker process
const runner = createProjectionRunner({ db, readStream, registry });

// Process all streams in a partition
while (true) {
  const streams = await db
    .selectFrom("streams")
    .select(["stream_id"])
    .where("partition", "=", partition)
    .limit(50)
    .execute();
  
  for (const stream of streams) {
    await runner.projectEvents("worker-subscription", stream.stream_id, {
      partition,
      batchSize: 200,
    });
  }
  
  await new Promise((resolve) => setTimeout(resolve, 1000)); // Poll interval
}

Process Events (Continuous Background Processing)

For continuous, automatic background processing, use the event consumer:

typescript
import { createKyselyEventStoreConsumer } from "@wataruoguchi/emmett-event-store-kysely";

const consumer = createKyselyEventStoreConsumer({
  db,
  logger,
  consumerName: "carts-read-model",
  batchSize: 100,
  pollingInterval: 1000, // Poll every 1 second
});

// Subscribe to events
consumer.subscribe(async (event) => {
  // Process event
  await processEvent({ db, partition: event.metadata.partition }, event);
}, "CartCreated");

await consumer.start();

API Reference

Event Store

getKyselyEventStore(deps: Dependencies): KyselyEventStore

Creates a new event store instance.

Parameters:

typescript
interface Dependencies {
  db: DatabaseExecutor;  // Kysely database instance
  logger?: Logger;       // Optional logger
  inTransaction?: boolean;
}

Returns: KyselyEventStore - Event store instance

Example:

typescript
const eventStore = getKyselyEventStore({ db, logger });

Event Store Methods

The event store implements Emmett's EventStore interface. Typically, you don't call these methods directly—instead, you use DeciderCommandHandler from Emmett which internally uses these methods:

  • appendToStream() - Appends events to a stream (used internally by DeciderCommandHandler)

  • readStream() - Reads events from a stream (used internally for state reconstruction)

  • aggregateStream() - Rebuilds aggregate state from events (used internally by DeciderCommandHandler)

For most use cases, you'll work with command handlers rather than calling these methods directly. However, they're available if you need lower-level control.

aggregateStream(streamName: string, options): Promise<AggregateStreamResult>

Rebuilds aggregate state from events. This is typically used internally by DeciderCommandHandler.

typescript
const result = await eventStore.aggregateStream("cart-123", {
  partition: "tenant-456",
  evolve: (state, event) => { /* ... */ },
  getInitialState: () => ({ status: "init" }),
});

Snapshot Projections

createSnapshotProjectionRegistry(eventTypes, config)

Creates a projection registry for snapshot-based read models.

typescript
const registry = createSnapshotProjectionRegistry(
  ["CartCreated", "ItemAdded"],
  {
    tableName: "carts",
    primaryKeys: ["tenant_id", "cart_id"],
    extractKeys: (event, partition) => ({ /* ... */ }),
    evolve: domainEvolve,
    initialState: () => ({ /* ... */ }),
    mapToColumns: (state) => ({ /* ... */ }), // Optional
  }
);

Projection Runner

createProjectionRunner(deps): ProjectionRunner

Creates a projection runner for on-demand event processing. The runner executes immediately when called and processes events in batches, making it suitable for tests, backfills, or scheduled production jobs.

typescript
const runner = createProjectionRunner({
  db,
  readStream: eventStore.readStream,
  registry,
});

await runner.projectEvents("subscription-id", "stream-id", {
  partition: "tenant-id",
});

Event Consumer

createKyselyEventStoreConsumer(config): KyselyEventStoreConsumer

Creates an event consumer that automatically polls and processes events by global position across all streams. Ideal for continuous, hands-off background processing.

typescript
const consumer = createKyselyEventStoreConsumer({
  db,
  logger,
  consumerName: "my-consumer",
  batchSize: 100,
  pollingInterval: 1000,
});

consumer.subscribe(async (event) => {
  // Handle event
}, "EventType");

await consumer.start();
await consumer.stop(); // Graceful shutdown

Event Sourcing

Read Models & Projections

PostgreSQL & Kysely

TypeScript

Testing