con un clic
slice-state-view
builds a state-view slice from an event model
Menú
builds a state-view slice from an event model
builds an automation slice from an event model
Generate a sample configuration for slices
builds a state-change slice from an event model
analyzes test files and generates slice.json specifications for drift detection
Step-by-step guide for getting started with event-driven development
| name | slice-state-view |
| description | builds a state-view slice from an event model |
State View Slices are read model projections that build table-based views from events. They consume events and project them into queryable database tables using PostgreSQL.
If the processors-array in the slice json is not empty. Treat this as an AUTOMATION Slice. Load the skill for automation slice.
When creating a state-view slice, you MUST create the following files:
Every projection file follows this pattern:
import {postgreSQLRawSQLProjection} from '@event-driven-io/emmett-postgresql';
import {sql} from '@event-driven-io/dumbo';
import knex, {Knex} from 'knex';
import {EventType} from '../../events/EventType';
Define TypeScript types for the read model:
export type {Name}ReadModelItem = {
field1?: type,
field2?: type,
// fields from projection
}
export type {Name}ReadModel = {
data: {Name}ReadModelItem[],
}
export const tableName = 'table_name';
export const getKnexInstance = (connectionString: string): Knex => {
return knex({
client: 'pg',
connection: connectionString,
});
};
export const {Name}Projection = postgreSQLRawSQLProjection<EventType>({
canHandle: ["Event1", "Event2"], // events this projection handles
evolve: (event, context) => {
const {type, data} = event;
const db = getKnexInstance(context.connection.connectionString);
switch (type) {
case "Event1":
return sql(db(tableName)
.withSchema('public')
.insert({
field1: data.field1,
field2: data.field2,
})
.onConflict('id_field') // upsert on conflict
.merge({field1: data.field1, field2: data.field2})
.toQuery());
case "Event2":
return sql(db(tableName)
.withSchema('public')
.where('id_field', data.id)
.update({
field1: data.field1,
})
.toQuery());
default:
return [];
}
}
});
Use this pattern for events that create or update records:
return sql(db(tableName)
.withSchema('public')
.insert({ /* fields */ })
.onConflict('id_field')
.merge({ /* fields to update */ })
.toQuery());
Use this for events that only update existing records:
return sql(db(tableName)
.withSchema('public')
.where('id_field', data.id)
.update({ /* fields */ })
.toQuery());
Use this for events that remove records:
return sql(db(tableName)
.withSchema('public')
.where('id_field', data.id)
.delete()
.toQuery());
Every projection MUST have tests using PostgreSQLProjectionSpec with Testcontainers:
import {before, after, describe, it} from "node:test";
import {PostgreSQLProjectionAssert, PostgreSQLProjectionSpec} from "@event-driven-io/emmett-postgresql";
import {{Name}Projection} from "./{Name}Projection";
import {PostgreSqlContainer, StartedPostgreSqlContainer} from "@testcontainers/postgresql";
import {EventType} from "../../events/EventType"
import knex, {Knex} from 'knex';
import assert from 'assert';
import {runFlywayMigrations} from "../../common/testHelpers";
describe('{Name} Specification', () => {
let postgres: StartedPostgreSqlContainer;
let connectionString: string;
let db: Knex;
let given: PostgreSQLProjectionSpec<EventType>
before(async () => {
postgres = await new PostgreSqlContainer("postgres").start();
connectionString = postgres.getConnectionUri();
db = knex({
client: 'pg',
connection: connectionString,
});
await runFlywayMigrations(connectionString);
given = PostgreSQLProjectionSpec.for({
projection: {Name}Projection,
connectionString,
});
});
after(async () => {
await db?.destroy();
await postgres?.stop();
});
it('spec: {Name} - scenario', async () => {
const assertReadModel: PostgreSQLProjectionAssert = async ({connectionString: connStr}) => {
const queryDb = knex({
client: 'pg',
connection: connStr,
});
try {
const result = await queryDb('table_name')
.withSchema('public')
.select('*');
assert.strictEqual(result.length, 1);
// add more assertions
} finally {
await queryDb.destroy();
}
};
await given([{
type: 'EventName',
data: { /* event data */ },
metadata: {streamName: 'stream-id'}
}])
.when([]) // additional events to process
.then(assertReadModel);
});
});
Every read model exposes a GET endpoint to fetch data:
import {Request, Response, Router} from 'express';
import {{Name}ReadModel, tableName} from "./{Name}Projection";
import {WebApiSetup} from "@event-driven-io/emmett-expressjs";
import createClient from "../../supabase/api";
import {readmodel} from "../../core/readmodel";
import {requireUser} from "../../supabase/requireUser";
export const api =
(
// external dependencies
): WebApiSetup =>
(router: Router): void => {
router.get('/api/query/{name}-collection', async (req: Request, res: Response) => {
try {
const principal = await requireUser(req, res, true);
if (principal.error) {
return;
}
const userId = principal.user.id;
const id = req.query._id?.toString();
const supabase = createClient()
const query: any = {};
delete query._id;
const data: {Name}ReadModel | {Name}ReadModel[] | null =
id ? await readmodel(tableName, supabase).findById<{Name}ReadModel>("id_field", id) :
await readmodel(tableName, supabase).findAll<{Name}ReadModel>(query)
// Serialize, handling bigint properly
const sanitized = JSON.parse(
JSON.stringify(data || [], (key, value) =>
typeof value === 'bigint' ? value.toString() : value
)
);
return res.status(200).json(sanitized);
} catch (err) {
console.error(err);
return res.status(500).json({ok: false, error: 'Server error'});
}
});
};
Each read model requires a migration file in supabase/migrations/:
Naming Convention: V{N}__{tablename}.sql
V{N} - Version number (sequential: V1, V2, V3, etc.){tablename} - Lowercase table name matching the projection's tableNameExample: V8__locations.sql
-- Create {tablename} table
CREATE TABLE IF NOT EXISTS "public"."{tablename}"
(
id_field TEXT PRIMARY KEY,
field1 TEXT,
field2 INTEGER,
field3 TEXT,
restaurant_id uuid NOT NULL
);
IF NOT EXISTS for idempotencyonConflict()restaurant_id uuid NOT NULL column (required for multi-tenancy)supabase/migrations/ directorysrc/slices/{SliceName}/
├── {SliceName}Projection.ts # Projection logic
├── {SliceName}.test.ts # Tests
└── routes.ts # Query endpoint
supabase/migrations/
└── V{N}__{tablename}.sql # Database schema
in each slice folder, generate a file .slice.json
{
"id" : "<slice id>",
"slice": "<slice title>",
"context": "<contextx>",
"link": "https://miro.com/app/board/<board-id>=/?moveToWidget=<slice id>"
}
templates/Locations/ for simple single-event projection exampletemplates/Tables/ for multi-event projection with updatestemplates/V8__locations.sql for migration exampletemplates/V2__tables.sql for migration exampleto build the UI prompt, list the following facts:
to build the UI - use this table "<schema>.<table_name>"
Payload example:
<payload example as JSON>
this is the table definition:
<table definition as SQL DDL>