# Realtime Platform > Production-grade realtime infrastructure — Sync, Socket, and Database CDC streaming unified under one platform. This document contains the full content of all documentation pages for AI consumption. --- ## Architecture **URL:** https://docs.realtime.dev/docs/architecture **Description:** Understand the monorepo structure, event flow, and core design principles. The Realtime Platform is a modular, horizontally-scalable system designed around domain-driven subscriptions and a normalized event protocol. ## Design Principles ### Stateless Infrastructure Application nodes are stateless. All persistent state lives in: - **PostgreSQL** — metadata, migrations, audit trails, durable sync document storage (`sync_documents` table) - **Redis** — hot document cache (RedisJSON with TTL), pub/sub fanout, presence, queues (BullMQ) This enables horizontal scaling by simply adding more API server or worker instances. ### Hybrid Document Storage Sync documents use a **write-through cache** pattern for optimal latency and durability: - **Reads**: Redis first (fast path) → cache miss falls back to Postgres → loads result into Redis with TTL - **Writes**: Written to Redis immediately → asynchronously flushed to Postgres (non-blocking) - **TTL**: Active documents stay hot in Redis (default: 10 minutes, configurable via `SYNC_DOC_TTL_SECONDS`). Every read/write refreshes the TTL. - **Dirty sweep**: A background sweep (every 5s) persists unflushed documents to Postgres before TTL expiry. - **Shutdown**: Final flush on graceful shutdown ensures no data loss. - **Fallback**: When `REDIS_URL` is not set, the system uses Postgres-only storage automatically. ### Domain-Driven Subscriptions Clients subscribe to **domain topics** (e.g. `session.status`, `order.updated`), never to internal service details. The platform determines the producing service (Sync, Socket, or Database CDC) internally and delivers events through a unified envelope. ### Explicit Mutations Writes target the responsible service directly: ```typescript realtime.sync.mutate(...) // Sync service realtime.socket.publish(...) // Socket service ``` Database writes **never occur through the platform** — the platform only _observes_ database changes via CDC. ## System Architecture ```text ┌──────────────────────────────────────────────────────────────┐ │ Clients (SDK) │ │ subscribe(topic) · sync.create() · socket.publish() │ └──────────────┬──────────────────────────────┬────────────────┘ │ HTTP/WS │ HTTP/WS ┌──────────────▼──────────────────────────────▼────────────────┐ │ Backend (Express + Socket.IO) │ │ │ │ ┌─────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │ │ │ REST API│ │ WS Gateway│ │ Services │ │ Middleware │ │ │ │ Routes │ │ Events │ │ Sync/DB/ │ │ Auth/Scope │ │ │ │ │ │ │ │ Socket │ │ │ │ │ └────┬────┘ └────┬─────┘ └────┬─────┘ └──────────────┘ │ │ └─────────────┼────────────┘ │ │ │ │ │ ┌──────▼──────┐ │ │ │ Event Router │ │ │ │ Normalize → │ │ │ │ Publish → │ │ │ │ Filter → │ │ │ │ Deliver │ │ │ └──────┬──────┘ │ └─────────────────────┼────────────────────────────────────────┘ │ ┌────────────┼────────────┐ │ │ │ ┌────▼───┐ ┌─────▼────┐ ┌───▼────┐ │ Redis │ │PostgreSQL│ │ BullMQ │ │JSON/PS │ │ Metadata │ │ Queues │ └────────┘ └──────────┘ └───┬────┘ │ ┌───────────▼──────────┐ │ Workers │ │ CDC · Outbox · │ │ Webhooks · Analytics │ └──────────────────────┘ ``` ## Event Flow Every event in the system follows the same lifecycle: ## Event Envelope All services communicate using a normalized envelope: ```typescript interface RealtimeEvent { id: string; // e.g. evt_a1b2c3d4 topic: string; // e.g. session.status source: 'database' | 'sync' | 'socket'; type: string; // e.g. database.update payload: Record; metadata: { timestamp: number; table?: string; // database events operation?: 'insert' | 'update' | 'delete'; revision?: number; // sync events }; } ``` **Redis channel pattern:** `realtime:event:{topic}` (e.g. `realtime:event:session.status`) ## Monorepo Organization The codebase is organized as a pnpm monorepo with Turborepo orchestration: ```text realtime/ ├── apps/ # Deployable applications │ ├── backend/ # Express API + Socket.IO │ ├── workers/ # Background processors │ ├── admin-ui/ # React management UI │ └── docs/ # Documentation (this site) ├── packages/ # Shared libraries │ ├── shared-types/ # TypeScript interfaces │ ├── shared-utils/ # Utilities (errors, IDs, etc.) │ ├── shared-config/ # Zod-validated env config │ ├── observability/ # Logging (pino) + Prometheus │ ├── redis-layer/ # Redis client + pub/sub + JSON │ ├── auth/ # JWT + signing keys + middleware │ ├── event-router/ # Normalization + routing + filters │ ├── topic-registry/ # Topic CRUD + validation │ ├── schema-registry/ # Schema versioning + compatibility │ ├── metrics/ # Metrics collectors + dashboards │ ├── database/ # Knex migrations + connection │ └── sdk/ # Client SDK └── turbo.json # Build pipeline config ``` ## Multi-Tenant Isolation The platform supports multi-application tenancy: - Every API request carries an `X-Application-Id` header - Backend middleware extracts this into `req.applicationId` - All entity-owning APIs (topics, schemas, mappings, webhooks) filter by application - The Admin UI auto-attaches the current application ID from localStorage ## Environment Promotion Configuration (mappings, schemas, topics) supports a three-stage promotion workflow: ```text Development → Staging → Production ``` Each entity can be independently promoted between environments. The Environment Grid in the Admin UI provides a visual overview of sync status across all environments. ## Dependency Graph Turborepo manages the build dependency graph. Packages build in order of their dependencies: ```text shared-types └── shared-utils └── shared-config ├── observability ├── redis-layer ├── auth ├── event-router ├── topic-registry ├── schema-registry ├── metrics └── database ├── backend (app) ├── workers (app) └── sdk ``` ## Performance Targets | Metric | Target | |--------|--------| | Write latency | < 50ms p95 | | Broadcast latency | < 100ms p95 | | Subscription latency | < 100ms | | DB event latency | < 150ms | --- ## Configuration **URL:** https://docs.realtime.dev/docs/configuration **Description:** All environment variables, configuration options, and setup for each service. The Realtime Platform uses a Zod-validated configuration system loaded from environment variables. All backend services share the same configuration schema defined in the `@realtime/shared-config` package. ## Environment Variables ### Backend Server | Variable | Required | Default | Description | |----------|----------|---------|-------------| | `NODE_ENV` | No | `development` | `development` \| `staging` \| `production` \| `test` | | `PORT` | No | `3000` | HTTP server listen port | | `LOG_LEVEL` | No | `info` | `debug` \| `info` \| `warn` \| `error` | ### PostgreSQL (Platform Database) The platform's own metadata database for topics, schemas, mappings, keys, migrations, users, etc. | Variable | Required | Default | Description | |----------|----------|---------|-------------| | `DATABASE_URL` | No | — | Full connection string (takes priority if set) | | `POSTGRES_HOST` | Yes* | `localhost` | PostgreSQL hostname | | `POSTGRES_PORT` | No | `5432` | PostgreSQL port | | `POSTGRES_USER` | Yes* | — | PostgreSQL username | | `POSTGRES_PASSWORD` | Yes* | — | PostgreSQL password | | `POSTGRES_DB` | Yes* | — | PostgreSQL database name | ### Redis | Variable | Required | Default | Description | |----------|----------|---------|-------------| | `REDIS_URL` | Yes | — | Redis connection URL (e.g. `redis://localhost:6379`) | Redis must have the **RedisJSON** module loaded for sync document storage. ### Authentication | Variable | Required | Default | Description | |----------|----------|---------|-------------| | `JWT_SECRET` | No | — | Default JWT signing secret | ### Sync Document Cache | Variable | Required | Default | Description | |----------|----------|---------|-------------| | `SYNC_DOC_TTL_SECONDS` | No | `600` | TTL in seconds for hot sync documents in Redis (default: 10 minutes) | ### CDC (Change Data Capture) These configure the connection to your **application database** — the database whose changes you want to stream in real time. This is separate from the platform's own metadata database. | Variable | Required | Default | Description | |----------|----------|---------|-------------| | `CDC_DATABASE_URL` | No | — | Target application database connection string | | `CDC_POLL_INTERVAL_MS` | No | `1000` | CDC polling interval in milliseconds | | `CDC_SLOT_NAME` | No | `realtime_cdc_slot` | PostgreSQL logical replication slot name | ### Admin UI | Variable | Default | Description | |----------|---------|-------------| | `VITE_API_URL` | `http://localhost:3000` | Backend API base URL | ## Example `.env` File ```bash # Server NODE_ENV=development PORT=3000 LOG_LEVEL=debug # Platform database (Realtime's own metadata) POSTGRES_HOST=localhost POSTGRES_PORT=5432 POSTGRES_USER=realtime POSTGRES_PASSWORD=realtime_secret POSTGRES_DB=realtime # Redis (with RedisJSON module) REDIS_URL=redis://localhost:6379 # Sync Document Cache (default: 10 minutes) # SYNC_DOC_TTL_SECONDS=600 # JWT Authentication JWT_SECRET=your-jwt-secret-here # Target database (your application DB for CDC monitoring) CDC_DATABASE_URL=postgresql://myapp:myapp@localhost:5432/myapp CDC_POLL_INTERVAL_MS=1000 CDC_SLOT_NAME=realtime_cdc_slot ``` ## Configuration Validation The `@realtime/shared-config` package validates all environment variables at startup using Zod schemas. If a required variable is missing or invalid, the service logs a clear error message and exits. ```typescript const config = getConfig(); // config.port → 3000 // config.logLevel → 'debug' // config.redis.url → 'redis://localhost:6379' // config.postgres.* → database connection details ``` ## Docker Compose The included `docker-compose.yml` provides local development infrastructure: ```yaml services: postgres: image: postgres:14-alpine environment: POSTGRES_USER: realtime POSTGRES_PASSWORD: realtime_secret POSTGRES_DB: realtime ports: - "5432:5432" volumes: - pgdata:/var/lib/postgresql/data redis: image: redis/redis-stack-server:latest ports: - "6379:6379" volumes: pgdata: ``` ### Starting Services ```bash # Start all infrastructure docker-compose up -d # Start only PostgreSQL docker-compose up -d postgres # Start only Redis docker-compose up -d redis # View logs docker-compose logs -f # Stop all docker-compose down ``` ## Knex Configuration The database connection is managed by Knex in `packages/database/src/knexfile.ts`: - **Connection pool:** min 2, max 10 - **Migration table:** `knex_migrations` - **Migration directory:** `packages/database/src/migrations/` Connection priority: 1. `DATABASE_URL` (full connection string) 2. Individual `POSTGRES_*` variables ## Production Considerations - Set `NODE_ENV=production` for optimized logging and error handling - Use `LOG_LEVEL=info` or `warn` to reduce log volume - Configure Redis with authentication: `redis://user:password@host:6379` - Use connection pooling for PostgreSQL (the platform uses min 2, max 10 by default) - Set up proper signing keys via the Admin API instead of relying on `JWT_SECRET` - Use TLS for all database and Redis connections in production --- ## Core Concepts **URL:** https://docs.realtime.dev/docs/core-concepts **Description:** Understand the relationship between Topics, Schemas, Mappings, and how events flow through the platform to subscribers and webhooks. Before diving into the API reference or service guides, it's important to understand the three foundational building blocks of the Realtime Platform and how they relate to each other. ## The Three Building Blocks ### Topics — The Routing Unit A **Topic** is a named channel that events are published to. Subscribers (WebSocket clients, webhooks, SDK listeners) listen on topics to receive events. Topics are the **central fan-out point** for all event delivery in the platform. ```text session.status order.updated chat.room.message proctor.session.started ``` Topics use dot-separated, lowercase names following a domain-driven convention. Every event in the system — regardless of how it was produced — is published to a topic. ### Schemas — The Payload Contract (Optional) A **Schema** is a versioned JSON Schema definition attached to a specific topic. It defines the expected shape of event payloads published to that topic. - Each topic can have **multiple schema versions** (1, 2, 3, ...). - The topic's `activeSchemaVersion` field points to the currently enforced version. - When a new event is published, the payload is validated against the active schema before delivery. - Backward compatibility mode ensures new schema versions don't break existing consumers. ### Mappings — The CDC Bridge A **Mapping** (also called a Domain Mapping) defines how a **database table change** — captured via Change Data Capture (CDC) — is transformed into an event and routed to a topic. A mapping specifies: - Which **table** to watch - Which **operations** to capture (insert, update, delete) - Which **columns** must change to trigger the event - What **filter conditions** to apply before routing - How to **project row columns** into the event payload - Which **topic** to publish the resulting event to ## How They Relate ```text ┌──────────────┐ N:1 ┌─────────┐ 1:N ┌──────────────┐ │ Mapping │ ──────────────── │ Topic │ ──────────────── │ Schema │ │ (CDC table → │ publishes to │ (routing │ validates via │ (versioned │ │ event xform)│ │ channel)│ │ JSON Schema)│ └──────────────┘ └─────────┘ └──────────────┘ ``` | Relationship | Cardinality | Required? | |---|---|---| | **Topic → Schemas** | 1 topic has 0..N schema versions | No — schemas are optional | | **Topic → Mappings** | 1 topic can be targeted by 0..N mappings | No — topics work without CDC | | **Mapping → Topic** | Each mapping targets exactly 1 topic | Yes — `topic` is required on every mapping | | **Mapping → Schema** | No direct link — the schema belongs to the topic, not the mapping | N/A | ### Key Takeaways - **Topics are the center of everything.** They are the single fan-out point that connects producers to consumers. - **Schemas are optional and belong to topics**, not to mappings. They validate payloads before delivery. - **Mappings are optional and only used for CDC.** They transform database row changes into topic events. - In the simplest case (one table, one topic, one schema), the relationship *looks* 1:1:1, but the design supports much more complex topologies. ## The Topic Fan-Out Model The topic is the **single delivery point** for all event types. Regardless of how an event enters the platform, it lands on a topic and fans out to all interested subscribers. ### Event Sources (Producers) Events can enter a topic from four different sources: ```text ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ CDC Mapping │ │ Sync Service │ │ Socket Gateway │ │ SDK publish() │ │ (DB row change │ │ (document │ │ (client sends │ │ (app code │ │ → topic event) │ │ mutation) │ │ message) │ │ publishes) │ └────────┬─────────┘ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │ │ │ │ └─────────────────────┴──────────┬──────────┴─────────────────────┘ │ ▼ ┌───────────────┐ │ Topic │ │ (fan-out │ │ point) │ └───────┬───────┘ │ ┌─────────────────┼─────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ WebSocket │ │ Webhook │ │ SDK Event │ │ Subscribers │ │ Endpoints │ │ Listeners │ └──────────────┘ └──────────────┘ └──────────────┘ ``` | Source | How it enters | Example | |---|---|---| | **CDC Mapping** | Database row change is transformed by a mapping and published to the mapping's target topic | `sessions` table update → `session.status` topic | | **Sync Service** | Document create/update/delete publishes to `sync.{service}.{operation}` | Document merge → `sync.notes.updated` topic | | **Socket Gateway** | Client publishes a message to a topic via Socket.IO or WebSocket | Chat message → `chat.room.message` topic | | **SDK `publish()`** | Application code publishes an event directly to a topic | Background job → `report.generated` topic | ### Event Consumers (Subscribers) When an event lands on a topic, it is delivered to **all** registered subscribers: | Consumer | How it subscribes | Delivery method | |---|---|---| | **WebSocket clients** | `subscribe({ topic: 'session.status' })` via Socket.IO or plain WebSocket | Real-time push over the open connection | | **Webhook endpoints** | Registered via API/Admin UI with matching event types for the topic | HTTP POST with HMAC-SHA256 signature | | **SDK event listeners** | `realtime.on('event:session.status', handler)` | Callback invoked with the event payload | ### Schema Validation in the Flow Schemas act as a **gate** between the producer and the fan-out. When a topic has an active schema version, the event payload is validated **before** it is delivered to subscribers: ```text Event from any source │ ▼ ┌──────────┐ ❌ Rejected │ Schema │────────────────▶ (validation error) │ Validate │ └────┬─────┘ │ ✅ Valid ▼ ┌──────────┐ │ Topic │ │ Fan-out │ └──────────┘ │ Delivered to all subscribers ``` If the topic has no schema (or `activeSchemaVersion` is 0), the validation step is skipped and events pass through freely. ## Common Patterns ### Pattern 1: Simple CDC → Webhook Stream a database table change to an external HTTP endpoint. ### Pattern 2: Multiple Tables → One Topic Aggregate changes from several tables into a single topic for unified consumption. ```text users table ─── mapping ───┐ addresses table ─── mapping ───┤──▶ user.profile.changed ──▶ subscribers preferences table ── mapping ───┘ ``` All three mappings target the same topic. Subscribers receive a unified stream of profile-related changes without needing to know which table changed. ### Pattern 3: One Table → Multiple Topics Route different types of changes from the same table to different topics. ```text ┌── mapping (when status='active') ──▶ session.started sessions table ────┤ ├── mapping (when status='completed') ──▶ session.ended │ └── mapping (any update) ──▶ session.audit.log ``` Use mapping filter conditions (`when`) to route different changes to different topics based on column values. ### Pattern 4: SDK Events Without CDC Publish events directly from application code — no database or mappings needed. ```typescript // Your application code publishes directly to a topic realtime.publish('report.generated', { reportId: 'rpt_123', format: 'pdf', generatedBy: 'system', }); // Subscribers on 'report.generated' receive the event // Webhook endpoints on 'report.generated' get HTTP POSTs ``` No mapping or CDC setup required — the topic is all you need. ## Quick Reference | Concept | What it is | Required? | Belongs to | |---|---|---|---| | **Topic** | Named routing channel for events | Yes — every event needs a topic | Application + Environment | | **Schema** | Versioned JSON Schema for payload validation | No — topics work without schemas | A specific Topic | | **Mapping** | CDC rule: table change → topic event | No — only needed for database CDC | Points to a Topic | | **Webhook** | HTTP endpoint that receives events | No — one of many subscriber types | Subscribes to Topics | | **Subscription** | Real-time listener on a topic | No — one of many subscriber types | Subscribes to Topics | --- ## Data Flow & Architecture **URL:** https://docs.realtime.dev/docs/data-flow **Description:** Complete data flow diagram showing how events move through every path in the platform, including queues, workers, and independently scalable components. This page provides a single unified view of **every data path** in the Realtime Platform — from event origination through delivery to clients and external systems — along with which components can be scaled independently. ## Full System Diagram ```text ┌─────────────────────────────────────────────────────────────────────────────────────────┐ │ DATA SOURCES │ │ │ │ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │ │ │ Application DB │ │ Client SDK / │ │ Client SDK / │ │ │ │ (PostgreSQL) │ │ REST API │ │ REST API │ │ │ │ │ │ sync.create() │ │ socket.publish()│ │ │ │ INSERT/UPDATE/ │ │ sync.merge() │ │ broadcast() │ │ │ │ DELETE on tables │ │ sync.replace() │ │ │ │ │ └────────┬─────────┘ └────────┬─────────┘ └────────┬─────────┘ │ │ │ │ │ │ │ WAL Stream HTTP / WS HTTP / WS │ └───────────┼───────────────────────┼───────────────────────┼──────────────────────────────┘ │ │ │ ▼ ▼ ▼ ┌───────────────────┐ ┌────────────────────┐ ┌────────────────────────┐ │ CDC Pipeline │ │ Sync Service │ │ Socket Gateway │ │ ┌──────────────┐ │ │ (HybridDocument │ │ ┌──────────────────┐ │ │ │ WAL CDC │ │ │ Service) │ │ │ Socket.IO Server │ │ │ │ Reader │ │ │ │ │ │ (port 3000) │ │ │ │ │ │ │ Redis hot cache + │ │ └────────┬─────────┘ │ │ │ pgoutput │ │ │ Postgres durable │ │ ┌────────┴─────────┐ │ │ │ protocol │ │ │ with revision │ │ │ WebSocket Server │ │ │ └──────┬───────┘ │ │ tracking │ │ │ (path: /ws) │ │ │ │ │ └─────────┬──────────┘ │ └────────┬─────────┘ │ │ ▼ │ │ │ │ │ │ ┌──────────────┐ │ │ │ │ │ │ │ Mapping │ │ │ │ │ │ │ │ Evaluator │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ • match table│ │ │ │ │ │ │ │ • check ops │ │ │ │ │ │ │ │ • apply when │ │ │ │ │ │ │ │ • build │ │ │ │ │ │ │ │ payload │ │ │ │ │ │ │ └──────┬───────┘ │ │ │ │ │ │ │ │ │ │ │ │ └─────────┼─────────┘ │ └───────────┼────────────┘ │ │ │ │ RawEventInput │ RawEventInput │ Channel broadcast │ {topic,source, │ {topic,source, │ via ChannelService │ type,payload} │ type,payload} │ ▼ ▼ ▼ ┌──────────────────────────────────────────────────────────────────────────────────┐ │ EVENT ROUTER │ │ (@realtime/event-router) │ │ │ │ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────────────────────┐ │ │ │ EventNormalizer │──▶│ EventPublisher │──▶│ SubscriptionRegistry │ │ │ │ │ │ │ │ │ │ │ │ Assigns evt_ ID │ │ Publishes to │ │ In-memory Map │ │ │ │ Wraps in │ │ Redis Pub/Sub: │ │ │ │ │ │ RealtimeEvent │ │ realtime:event: │ │ • Looks up subs by topic │ │ │ │ envelope │ │ {topic} │ │ • Applies EventFilter │ │ │ │ │ │ │ │ (eq, in operators) │ │ │ └─────────────────┘ └─────────────────┘ │ • Calls sub.handler() │ │ │ │ for each match │ │ │ └──────────────────────────────┘ │ └────────────────────────────────┬─────────────────────────────────────────────────┘ │ Delivered to sub.handler() (per-client callback) │ ┌──────────────────┼──────────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────────────┐ ┌────────────────┐ ┌──────────────────────┐ │ Socket.IO Client │ │ Plain WS Client│ │ Side Effects │ │ │ │ │ │ │ │ socket.emit( │ │ ws.send(JSON) │ │ See "Side Effect │ │ 'event', data) │ │ │ │ Pipelines" below │ └──────────────────┘ └────────────────┘ └──────────────────────┘ ``` ## Three Event Source Paths Every event in the platform originates from exactly one of three sources. Each follows a distinct ingestion path before converging at the Event Router. ### Path 1: Database CDC ```text Application DB (external PostgreSQL) │ │ WAL logical replication (pgoutput protocol) │ Replication slot: realtime_cdc_slot │ Publication: realtime_cdc_pub │ ▼ WalCdcReader (one per unique target DB URL) │ │ Parses XLogData messages → extracts table, operation, old/new row │ Stores raw CdcEvent in cdc_events table (Postgres) │ ▼ Mapping Evaluator (inside WalCdcReader.processChange) │ │ For each active CdcSubscription on this table: │ 1. Load DomainMapping from MappingService │ 2. Check if operation matches mapping.events[] │ 3. Evaluate mapping.when conditions (changed, eq, and/or) │ 4. Build payload by resolving $row.column references │ 5. Construct RawEventInput { topic, source:'database', type, payload } │ ▼ EventRouter.route(input) │ ▼ Clients receive event on subscribed topic ``` **Key details:** - CDC database URL is resolved **per-application per-environment** from `Application.config.cdc[env]` - Each unique target database gets its own `WalCdcReader` instance - Leader election (Redis SET NX) ensures only **one backend instance** holds the replication slot - Reader status is persisted in `cdc_reader_status` table (Postgres) ### Path 2: Sync Service ```text Client SDK / REST API │ │ POST /api/sync/{service}/documents (create) │ PUT /api/sync/{service}/documents/{id} (replace) │ PATCH /api/sync/{service}/documents/{id}/merge (merge) │ DELETE /api/sync/{service}/documents/{id} (delete) │ ▼ HybridDocumentService │ │ 1. Check Redis cache first (fast path) │ 2. If cache miss → load from Postgres → cache in Redis with TTL │ 3. Apply mutation (replace / deep-merge / delete) │ 4. Compute diff between old and new data (JsonDiff) │ 5. Increment revision number │ 6. Check expectedRevision for optimistic concurrency (409 on conflict) │ 7. Write to Redis immediately (refresh TTL) │ 8. Async non-blocking flush to Postgres │ ▼ publishEvent() → EventRouter.route() │ │ topic: sync.{service}.{operation} │ source: 'sync' │ payload: { service, documentId, revision, data, diff } │ (diff included for update/delete; stripped per subscriber returnMode) │ ▼ Clients receive event on subscribed topic ``` **Key details:** - Documents use a **hybrid Redis + Postgres** architecture: Redis as hot cache, Postgres as durable store - Active documents stay hot in Redis (default TTL: 10 minutes, configurable via `SYNC_DOC_TTL_SECONDS`) - Writes go to Redis immediately → async flush to Postgres (non-blocking) - Background dirty sweep (every 5s) ensures unflushed docs reach Postgres before TTL expiry - Falls back to Postgres-only when `REDIS_URL` is not set - Revision-based optimistic concurrency control - Topic naming convention: `sync.{service}.created`, `sync.{service}.updated`, `sync.{service}.deleted` ### Path 3: Socket Service ```text Client SDK / REST API │ │ socket.emit('subscribe', { topic }) (Socket.IO) │ ws.send({ type: 'subscribe', topic }) (plain WS) │ POST /api/socket/broadcast { channel, event, data } (REST) │ ▼ Socket Gateway (Socket.IO) / WS Gateway (plain WebSocket) │ │ On subscribe: │ 1. Generate sub_ prefixed ID │ 2. Create Subscription { id, clientId, topic, filter, returnMode, handler } │ 3. Register in SubscriptionRegistry (in-memory) │ 4. Join Socket.IO room / track in WS channel map │ 5. Handler wraps applyReturnMode() to strip data/diff per subscriber preference │ │ On broadcast: │ 1. ChannelService.broadcast(channel, event, data) │ 2. Socket.IO: io.to(channel).emit(event, data) │ 3. Plain WS: iterate wsSenders for channel members │ 4. Fire broadcastHook → MetricsUpdater → HotTopicsAnalyzer │ ▼ Directly delivered to channel members (no Event Router for raw broadcast) ``` **Key details:** - Two parallel gateways coexist on the same HTTP server: Socket.IO (default) and plain WebSocket (`/ws` path) - JWT authentication on connection handshake (both gateways) - When a client subscribes to a **topic** (not just a channel), the `SubscriptionRegistry` handler routes events from the Event Router directly to that client's socket ## Outbox Pattern (Alternative to CDC) ```text Application code │ │ BEGIN transaction │ INSERT INTO orders ... │ INSERT INTO event_outbox (topic, event_type, payload, table_name, operation) │ COMMIT │ ▼ OutboxWorker (polling worker, runs in workers app) │ │ SELECT * FROM event_outbox WHERE processed_at IS NULL │ ORDER BY id ASC LIMIT 100 FOR UPDATE SKIP LOCKED │ │ For each row: │ 1. Build RawEventInput from outbox row │ 2. EventRouter.route(input) │ 3. UPDATE event_outbox SET processed_at = NOW() │ ▼ EventRouter.route(input) → Clients ``` **Key details:** - `FOR UPDATE SKIP LOCKED` enables **multiple outbox workers** to process concurrently without conflicts - Immediate re-poll when batch returns results (zero-delay tight loop), otherwise polls at configurable interval (default 1s) - Transactional guarantee: the outbox row and the business write are in the same transaction ## Side Effect Pipelines After events flow through the Event Router to clients, several side-effect systems process events asynchronously. ### Webhook Delivery ```text Gateway event (connect, disconnect, subscribe, broadcast) │ ▼ WebhookDispatcher.dispatch(event) ← fire-and-forget (non-blocking) │ │ 1. Query WebhookEndpointStore for active endpoints │ matching event type (or wildcard '*') │ 2. Filter by applicationId + environment scope │ 3. For each matching endpoint: │ a. JSON.stringify(event) │ b. HMAC-SHA256 signature: t={timestamp},v1={hash} │ c. POST to endpoint.url with retry (up to 3 attempts) │ d. Exponential backoff between retries │ 4. Log result to webhook_logs table (Postgres) │ 5. Fire onSuccess/onFailure → MetricsUpdater │ ▼ External HTTP endpoint receives signed webhook payload ``` ### Webhook Delivery via BullMQ (Workers App) ```text Event enqueued to 'webhook-delivery' queue (BullMQ / Redis) │ ▼ WebhookDeliveryWorker (in workers app) │ │ 1. Dequeue job from BullMQ │ 2. Look up endpoint from WebhookRegistry │ 3. Sign payload with HMAC-SHA256 │ 4. POST to endpoint URL with timeout + retry │ 5. Exponential backoff: min(1000 * 2^attempt + jitter, 60s) │ 6. Failed after maxAttempts → Dead Letter Queue (in-memory) │ ▼ External HTTP endpoint ``` ### Analytics Pipeline ```text Events from any source │ ▼ AnalyticsWorker (in workers app) │ │ ingest(event) → buffer + rolling aggregations │ │ Periodic flush (default 60s): │ • Drain buffer │ • Emit per-topic 1-minute aggregations │ • { topic, window:'1m', count, startTime, endTime } │ ▼ Analytics storage / downstream consumers ``` ### Alert Pipeline ```text MetricsUpdater (5s tick in backend) │ │ checkAlerts(): │ • webhookFailureRate > 50% → critical alert │ • webhookFailureRate > 20% → warning alert │ • activeConnections > 10,000 → warning alert │ ▼ AlertStore (in-memory, ring buffer of 1000) │ ▼ GET /api/alerts → Admin UI Alert Center ``` ### Metrics Pipeline ```text Every gateway event, broadcast, webhook result │ ▼ MetricsUpdater (5s tick) │ │ Reads live state from: │ • ChannelService → connection count, channel list │ • SubscriptionRegistry → subscription count │ • CdcService → events received/routed, replication lag │ • WebhookDispatcher callbacks → success/failure counts │ │ Computes rates and pushes into: │ • DashboardMetrics → delivery, reliability, pipeline metrics │ • HotTopicsAnalyzer → per-topic event count, fanout, latency │ ▼ GET /api/metrics/dashboard → Admin UI Dashboard GET /api/metrics/hot-topics → Admin UI Hot Topics /metrics → Prometheus scrape endpoint ``` ## Redis Pub/Sub Cross-Instance Fanout When running multiple backend instances, Redis Pub/Sub ensures events reach all connected clients regardless of which instance they're connected to: ```text Instance A Redis Instance B ┌──────────┐ ┌────────────┐ ┌──────────┐ │ CDC event │──EventPublisher──▶│ PUBLISH │ │ │ │ on topic │ │ realtime: │──SUBSCRIBE────────▶│ Receives │ │ session. │ │ event: │ │ event, │ │ status │ │ session. │ │ delivers │ │ │ │ status │ │ to local │ │ │ │ │ │ subs │ └──────────┘ └────────────┘ └──────────┘ ``` **Channel pattern:** `realtime:event:{topic}` (e.g. `realtime:event:session.status`) ## Data Stores | Store | Technology | Purpose | Scaled By | |-------|-----------|---------|-----------| | **Platform DB** | PostgreSQL | Metadata: topics, schemas, mappings, users, sessions, CDC status, webhook logs, durable sync document storage, deployments | Vertical scaling, read replicas | | **Target DB(s)** | PostgreSQL | Application databases monitored by CDC (external, one per app per env) | Independent of platform | | **Redis** | Redis + RedisJSON | Hot sync document cache (TTL-based), Pub/Sub fanout, leader election locks, BullMQ job queues, presence | Redis Cluster for horizontal scaling | ## Independently Scalable Components The platform is designed so each component can be scaled independently based on load characteristics: ```text ┌─────────────────────────────────────────────────────────────────────┐ │ SCALABILITY MAP │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Backend Instances (N) │ │ │ │ │ │ │ │ ✅ REST API — stateless, scale out │ │ │ │ ✅ Socket Gateway — stateless with Redis │ │ │ │ ✅ WS Gateway — stateless with Redis │ │ │ │ ⚠️ CDC Reader — ONE leader instance │ │ │ │ ✅ Webhook Dispatch — async, non-blocking │ │ │ │ ✅ Metrics Updater — per-instance local │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Worker Instances (M) │ │ │ │ │ │ │ │ ✅ Outbox Worker — scale with SKIP LOCKED│ │ │ │ ✅ Webhook Worker — scale via BullMQ │ │ │ │ ✅ Analytics Worker — independent flush │ │ │ │ ✅ Alert Worker — per-instance rules │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Infrastructure │ │ │ │ │ │ │ │ ✅ PostgreSQL — read replicas │ │ │ │ ✅ Redis — Redis Cluster │ │ │ │ ✅ Admin UI — static SPA, CDN │ │ │ └─────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘ ``` ### Component Scaling Details ## Event Envelope Reference Every event flowing through the platform uses this normalized envelope: ```typescript interface RealtimeEvent { id: string; // e.g. evt_a1b2c3d4 topic: string; // e.g. session.status source: 'database' | 'sync' | 'socket'; type: string; // e.g. database.update payload: Record; // sync events: { service, documentId, revision, data?, diff? } metadata: { timestamp: number; table?: string; // database events only operation?: 'insert' | 'update' | 'delete'; revision?: number; // sync events only }; } ``` ## End-to-End Latency Breakdown ```text Database CDC Path (target: < 150ms end-to-end): WAL write → pgoutput decode ~10-30ms Mapping evaluation ~1-5ms EventRouter normalize+publish ~5-15ms Redis Pub/Sub fanout ~1-5ms Socket delivery to client ~5-20ms Sync Service Path (target: < 50ms): HTTP request parsing ~1-2ms Redis cache read (hit) ~1-3ms (or Postgres fallback ~10-25ms) Redis cache write + TTL ~1-3ms Async Postgres flush ~0ms (non-blocking, background) EventRouter normalize+publish ~5-15ms Socket delivery to client ~5-20ms Socket Broadcast Path (target: < 100ms): Message received by gateway ~1-2ms Channel broadcast via IO ~5-30ms (depends on subscriber count) Redis cross-instance fanout ~1-5ms (if multi-instance) ``` ## Complete Wiring Summary This table shows how every major component is wired together at startup in `apps/backend/src/index.ts`: | Component | Created From | Wired To | |-----------|-------------|----------| | `EventRouter` | noopPublisher + subscriptionRegistry | CdcService, HybridDocumentService | | `SubscriptionRegistry` | standalone (in-memory) | EventRouter, Socket Gateway, WS Gateway, Socket API | | `ChannelService` | Socket.IO Server | Socket Gateway, WS Gateway, MetricsUpdater | | `CdcService` | stores + applicationService + mappingService + eventRouter | CDC API, MetricsUpdater | | `WalCdcReader` | created on-demand by readerFactory | CdcService (per target DB) | | `CdcLeaderElection` | Redis clients (main + sub) | CdcService (optional) | | `WebhookDispatcher` | webhookStore + logStore + callbacks | Socket Gateway, WS Gateway | | `MetricsUpdater` | dashboard + hotTopics + channelService + subscriptionRegistry | Broadcast hook, periodic 5s tick | | `HybridDocumentService` | RedisJsonClient + Knex db + EventRouter (when REDIS_URL set) | Sync API routes | | `PgDocumentService` | Knex db (fallback when no REDIS_URL) | Sync API routes | | `JwtService` | SigningKeyStore (pre-loaded from Postgres) | WS Gateway auth, Auth middleware | --- ## Realtime Platform **URL:** https://docs.realtime.dev/docs **Description:** Production-grade realtime infrastructure — Sync, Socket, and Database CDC streaming unified under one platform. The **Realtime Platform** is a production-grade infrastructure system that provides three unified realtime capabilities under a single, horizontally-scalable architecture. ## Quick Links ## How It Works Clients subscribe to **domain topics** (e.g. `session.status`, `order.updated`), not internal services. The platform determines the producing service internally and routes all events through a normalized `RealtimeEvent` envelope via Redis Pub/Sub. ```text Realtime Platform ├── API Server (Express + Socket.IO) ├── Socket Gateway ├── Event Router ├── Hosted Redis │ ├── RedisJSON (document state) │ ├── Pub/Sub (event fanout) │ └── BullMQ (job queues) ├── CDC / Outbox Workers ├── Webhook Dispatcher ├── Topic Registry ├── Schema Registry ├── Metrics Collector ├── Event Debugger & Replay ├── Admin API └── Management UI (React + MUI) ``` ## Key Features - **Domain-driven subscriptions** — Subscribe to business topics, not infrastructure channels - **Multi-tenant isolation** — Application-scoped data with environment promotion (dev → staging → prod) - **Versioned schemas** — JSON Schema validation with backward compatibility enforcement - **CDC streaming** — Stream PostgreSQL row changes to subscribers in real time - **Webhook delivery** — HMAC-signed webhook dispatch with retries and dead-letter queues - **Event debugging** — Trace, inspect, and replay events through the debugger - **Full Admin UI** — 17+ pages for managing every aspect of the platform - **User authentication** — Session-based auth with RBAC, MFA, and invite system - **Operational dashboards** — Real-time delivery, pipeline, and reliability metrics ## Technology Stack | Layer | Technology | |-------|-----------| | **Runtime** | Node.js >= 20, TypeScript | | **API** | Express, Socket.IO | | **Database** | PostgreSQL >= 14 | | **Cache/Pub-Sub** | Redis >= 7.0 (with RedisJSON) | | **Queue** | BullMQ | | **Build** | pnpm workspaces, Turborepo | | **Testing** | Vitest | | **Admin UI** | React, Material UI | | **SDK** | `@smarterservices/realtime` | --- ## Installation **URL:** https://docs.realtime.dev/docs/installation **Description:** Complete installation guide with prerequisites, environment setup, and verification. This guide walks through every step needed to get the Realtime Platform running from scratch. ## Prerequisites | Dependency | Version | Purpose | |-----------|---------|---------| | **Node.js** | >= 20.0.0 | Runtime for all services | | **pnpm** | >= 9.0.0 | Monorepo package manager | | **PostgreSQL** | >= 14 | Platform metadata + CDC source | | **Redis** | >= 7.0 (with RedisJSON) | Document state, pub/sub, queues | ### Install Node.js We recommend using [nvm](https://github.com/nvm-sh/nvm) to manage Node.js versions: ```bash nvm install 20 nvm use 20 node --version # v20.x.x ``` ### Install pnpm ```bash npm install -g pnpm@9 pnpm --version # 9.x.x ``` ### PostgreSQL ### Redis with RedisJSON The platform requires Redis with the **RedisJSON** module for sync document storage. ## Clone the Repository ```bash git clone realtime cd realtime ``` ## Install Dependencies ```bash pnpm install ``` This installs dependencies for all packages and apps in the monorepo. pnpm's workspace protocol ensures shared dependencies are hoisted efficiently. ## Environment Configuration Copy the example environment file: ```bash cp .env.example .env ``` ### Required Variables ```bash # PostgreSQL (platform metadata database) POSTGRES_HOST=localhost POSTGRES_PORT=5432 POSTGRES_USER=realtime POSTGRES_PASSWORD=realtime_secret POSTGRES_DB=realtime # Redis REDIS_URL=redis://localhost:6379 ``` ### Optional Variables ```bash # Server NODE_ENV=development PORT=3000 LOG_LEVEL=debug # JWT Authentication JWT_SECRET=your-jwt-secret-here # CDC (Change Data Capture) - your application database CDC_DATABASE_URL=postgresql://user:password@host:5432/your_app_db CDC_POLL_INTERVAL_MS=1000 CDC_SLOT_NAME=realtime_cdc_slot # Admin UI VITE_API_URL=http://localhost:3000 ``` ## Build Build all packages in dependency order: ```bash pnpm build ``` Turborepo handles the build graph automatically — shared packages build first, then apps. ## Verify Installation ### Run Tests ```bash pnpm test ``` All 221+ tests should pass across 12 packages. ### Start the Platform ```bash pnpm dev ``` Verify each service is running: | Service | Check | Expected | |---------|-------|----------| | **Backend** | `curl http://localhost:3000/health` | `{"status":"ok"}` | | **Admin UI** | Open `http://localhost:5173` | Login/setup page | | **Workers** | Check terminal output | `Workers started` log | ## Docker Compose (Full Stack) The included `docker-compose.yml` provides PostgreSQL and Redis for local development: ```yaml services: postgres: image: postgres:14-alpine environment: POSTGRES_USER: realtime POSTGRES_PASSWORD: realtime_secret POSTGRES_DB: realtime ports: - "5432:5432" redis: image: redis/redis-stack-server:latest ports: - "6379:6379" ``` Start both services: ```bash docker-compose up -d ``` ## Monorepo Structure After installation, the workspace contains: ```text realtime/ ├── apps/ │ ├── backend/ # Express API + Socket.IO server │ ├── workers/ # Background workers (CDC, webhooks, etc.) │ ├── admin-ui/ # React management dashboard │ └── docs/ # This documentation site ├── packages/ │ ├── shared-types/ # TypeScript interfaces │ ├── shared-utils/ # Common utilities │ ├── shared-config/ # Environment config (Zod) │ ├── observability/ # Logging + metrics │ ├── redis-layer/ # Redis client + pub/sub │ ├── auth/ # JWT + permissions │ ├── event-router/ # Event normalization + routing │ ├── topic-registry/ # Topic CRUD + validation │ ├── schema-registry/ # Schema versioning + compatibility │ ├── metrics/ # Prometheus metrics │ ├── database/ # Knex migrations + connection │ └── sdk/ # Client SDK ├── docker-compose.yml ├── turbo.json └── pnpm-workspace.yaml ``` ## Troubleshooting --- ## Packages **URL:** https://docs.realtime.dev/docs/packages **Description:** Reference guide for all 12 shared packages in the Realtime Platform monorepo. The Realtime Platform monorepo contains 12 shared packages under `packages/`. Each package is independently versioned and built, with Turborepo managing the dependency graph. ## @realtime/shared-types TypeScript interfaces shared across all packages. No runtime code — types only. **Key Types:** | Type | Description | |------|-------------| | `RealtimeEvent` | Normalized event envelope | | `TopicDefinition` | Topic registry record | | `SchemaDefinition` | Versioned JSON Schema record | | `DomainMapping` | Database-to-topic mapping config | | `SubscriptionFilter` | Client-side event filter | | `EventEnvelope` | Client-facing event wrapper | | `WebhookEndpoint` | Webhook endpoint configuration | | `WebhookDeliveryLog` | Delivery attempt record | | `Application` | Multi-tenant application record | | `User`, `Role`, `Permission` | Auth system types | | `Deployment`, `DeploymentItem` | Environment promotion types | ```typescript ``` ## @realtime/shared-utils Common utilities used across the platform. **Error Classes:** ```typescript throw new NotFoundError('Topic not found: session.status'); throw new ConflictError('Document revision mismatch'); throw new ValidationError('Invalid topic name format'); ``` **Object Helpers:** ```typescript const merged = deepMerge(base, override); const cloned = deepClone(original); const subset = pick(obj, ['name', 'email']); const filtered = omit(obj, ['password']); ``` **ID Generation:** ```typescript const eventId = generatePrefixedId('evt'); // evt_a1b2c3d4 const docId = generatePrefixedId('doc'); // doc_x9y8z7w6 ``` **Date Helpers:** ```typescript ``` ## @realtime/shared-config Zod-validated environment configuration. Reads from environment variables and provides a typed config object. ```typescript const config = getConfig(); // config.port → 3000 // config.logLevel → 'debug' // config.redis.url → 'redis://localhost:6379' // config.postgres.* → database connection details ``` If a required variable is missing or invalid, the validation throws a clear error at startup. ## @realtime/observability Logging and Prometheus metrics. **Logging (pino):** ```typescript const logger = getLogger(); logger.info('Server started'); logger.error({ err }, 'Request failed'); ``` **Metrics (Prometheus):** ```typescript const eventsReceived = createCounter({ name: 'realtime_events_received_total', help: 'Total events received', labelNames: ['topic', 'source'], }); eventsReceived.inc({ topic: 'session.status', source: 'database' }); ``` All metric factories are **idempotent** — calling `createCounter` with the same name returns the existing metric. ## @realtime/redis-layer Redis client, Pub/Sub, RedisJSON, and key patterns. **RedisJSON Client:** ```typescript await client.set('syncdoc:data:session:doc1', { status: 'active' }); const doc = await client.get('syncdoc:data:session:doc1'); await client.merge('syncdoc:data:session:doc1', { notes: 'updated' }); await client.del('syncdoc:data:session:doc1'); // TTL management (used by HybridDocumentService for cache expiry) await client.expire('syncdoc:data:session:doc1', 600); // Access underlying ioredis client for advanced operations const raw = client.getRawClient(); ``` **Pub/Sub Manager:** ```typescript const pubsub = new PubSubManager(redisUrl); pubsub.subscribe('realtime:event:session.status', (event) => { console.log('Received:', event); }); pubsub.publish('realtime:event:session.status', eventData); ``` **Key Patterns:** ```typescript RedisKeys.syncDocument('session', 'doc1'); // syncdoc:data:session:doc1 RedisKeys.eventChannel('session.status'); // realtime:event:session.status ``` **Queue Manager (BullMQ):** ```typescript const queue = QueueManager.createQueue('webhook-delivery'); await queue.add('deliver', { webhookId: 'wh_123', event: {...} }); ``` ## @realtime/auth JWT signing/verification, signing key management, and Express middleware. ```typescript const keyStore = new SigningKeyStore(); const jwt = new JwtService(keyStore); const token = jwt.sign({ sub: 'user123', permissions: ['subscribe'] }); const claims = jwt.verify(token); ``` **Express Middleware:** ```typescript app.use('/api', authMiddleware(jwtService)); // req.user is now populated with JWT claims ``` **Permission Roles:** `admin`, `service`, `client` ## @realtime/event-router Event normalization, routing, filtering, and subscription management. **Components:** | Class | Purpose | |-------|---------| | `EventNormalizer` | Converts raw input to `RealtimeEvent` with unique IDs | | `EventPublisher` | Publishes events to Redis Pub/Sub | | `EventFilter` | Applies subscription filters (equality, `in` operator) | | `SubscriptionRegistry` | In-memory subscription store by topic and client | | `EventRouter` | Orchestrates the full pipeline | ```typescript const router = new EventRouter({ publisher, normalizer, filter, registry }); await router.route(rawEvent); ``` ## @realtime/topic-registry Topic CRUD and validation. ```typescript const service = new TopicRegistryService(store); await service.create({ name: 'session.status', description: '...', owner: 'team-a' }); const topics = await service.list(); const topic = await service.get('session.status'); ``` **Validation Rules:** - Names must be dot-separated, lowercase - Format: `segment.segment` (letters, numbers, dots) - Minimum one dot separator ## @realtime/schema-registry Schema registration, versioning, validation, and backward compatibility checking. ```typescript const service = new SchemaRegistryService(store); await service.register({ topic: 'session.status', schema: { type: 'object', properties: { ... } }, compatibilityMode: 'backward', }); const result = await service.validate('session.status', payload); // { valid: true } or { valid: false, errors: [...] } ``` **Compatibility Modes:** `backward` (default), `none` ## @realtime/metrics Prometheus metric collectors, dashboard aggregation, and hot topic analysis. ```typescript const dashboard = new DashboardMetrics(collector); const stats = dashboard.getStats(); // { delivery: {...}, pipeline: {...}, reliability: {...} } const hot = new HotTopicsAnalyzer(); hot.recordEvent('session.status'); const hotTopics = hot.getHotTopics(); // [{ topic, eventsPerSec, subscriberCount, p95Fanout }] ``` ## @realtime/database Knex migration system and PostgreSQL connection management. ```typescript const db = getDatabase(); // Singleton Knex instance await runMigrations(); // Apply pending migrations await closeDatabase(); // Graceful shutdown ``` **Migration Commands:** ```bash pnpm --filter @realtime/database migrate:latest # Apply all pending pnpm --filter @realtime/database migrate:status # Check status pnpm --filter @realtime/database migrate:rollback # Rollback last batch pnpm --filter @realtime/database migrate:make name # Create new migration ``` **Connection Pool:** min 2, max 10 ## @smarterservices/realtime (SDK) Client SDK for browser and Node.js. See the full [SDK documentation](/docs/sdk/overview). --- ## Quick Start **URL:** https://docs.realtime.dev/docs/quickstart **Description:** Get the Realtime Platform running locally in 5 minutes. Get the entire platform — backend API, workers, and admin UI — running on your machine with a single command. ## Prerequisites Before you start, make sure you have: - **Node.js** >= 20.0.0 - **pnpm** >= 9.0.0 - **PostgreSQL** >= 14 - **Redis** >= 7.0 (with RedisJSON module) ## Setup ## First Steps After Setup ### Open the Admin UI Navigate to [http://localhost:5173](http://localhost:5173). On first launch you'll be prompted to create the initial admin account. ### Create Your First Topic ### Subscribe with the SDK ```bash npm install @smarterservices/realtime ``` ```typescript const realtime = new RealtimeClient({ url: 'http://localhost:3000', token: 'your-jwt-token', }); realtime.subscribe({ topic: 'session.status' }); realtime.on('event:session.status', (event) => { console.log('Session update:', event.payload); }); ``` ## Next Steps --- ## Security **URL:** https://docs.realtime.dev/docs/security **Description:** JWT authentication, signing key rotation, RBAC, MFA, webhook signing, and multi-tenant isolation. The Realtime Platform provides a comprehensive security model covering authentication, authorization, data isolation, and webhook integrity. ## Authentication ### JWT Tokens Service-to-service and SDK connections authenticate via JWT tokens: - Tokens are signed with HMAC-SHA256 using managed signing keys - Each token includes a `kid` (key ID) header for key rotation support - Tokens carry permission claims (`admin`, `service`, `client`) ```typescript // Issue a token POST /auth/token { "applicationId": "app_abc", "permissions": ["subscribe"] } // Verify a token POST /auth/verify { "token": "eyJhbGci..." } ``` ### Session-Based Auth (Admin UI) The Admin UI uses session-based authentication: 1. User logs in with email/password → session token created 2. Session token stored in `localStorage` and attached to all API requests 3. Sessions stored in PostgreSQL with expiration 4. Logout invalidates the session server-side ### Multi-Factor Authentication (MFA) TOTP-based MFA compatible with Google Authenticator, Authy, and 1Password: - Admin can enable MFA per user - QR code setup flow with TOTP verification - MFA challenge on login when enabled - Admin can disable MFA for any user ## Signing Key Management JWT signing keys support rotation without downtime: The **Rotate** operation combines steps 1 and 3 atomically — creates a new key and deactivates the old one in a single call. ### Key Storage Keys are stored in PostgreSQL (`signing_keys` table) and loaded into memory on startup. The `PgSigningKeyStore` provides async CRUD while the in-memory `SigningKeyStore` provides sync access for hot-path JWT operations. ## Role-Based Access Control (RBAC) ### Built-in Roles | Role | Description | |------|-------------| | `super_admin` | Full system access including user management and key rotation | | `admin` | Manage all entities, users, but limited key access | | `editor` | Create and modify topics, schemas, mappings, webhooks | | `viewer` | Read-only access to all entities | ### Permission Model Permissions are defined as `(resource, action)` pairs: **Resources:** `topics`, `schemas`, `mappings`, `webhooks`, `users`, `roles`, `invites`, `keys`, `deployments`, `debugger`, `documents`, `socket` **Actions:** `read`, `create`, `update`, `delete` ### Custom Roles Create custom roles with specific permission combinations via the Admin UI or API: ```json { "name": "webhook_manager", "permissions": [ { "resource": "webhooks", "action": "read" }, { "resource": "webhooks", "action": "create" }, { "resource": "webhooks", "action": "update" }, { "resource": "webhooks", "action": "delete" } ] } ``` ### Middleware The `requirePermission` middleware checks permissions on protected routes: ```typescript router.post('/api/topics', requirePermission('topics', 'create'), handler); ``` ## Multi-Tenant Isolation ### Application Scoping Every API request carries an `X-Application-Id` header: - Backend middleware extracts it into `req.applicationId` - All entity-owning APIs filter lists and tag creates by application - The Admin UI auto-attaches the header from the currently selected application ### Scoped Entities | Entity | App-Scoped | Env-Scoped | |--------|-----------|-----------| | Topics | Yes | Yes | | Schemas | Yes | Yes | | Mappings | Yes | Yes | | Webhooks | Yes | Yes | | Webhook Logs | Yes | Yes | | DB Subscriptions | Yes | Yes | | Deployments | Yes | Cross-env | | Sync/Documents | Via query | Via query | | Socket/Debugger | No (operational) | No | ### Environment Scoping Data is further isolated by environment (`development`, `staging`, `production`). The `X-Environment` header controls which environment's data is accessed. ## Webhook Signing Every webhook delivery is signed with HMAC-SHA256: ```text X-Webhook-Signature: ``` Verification on the receiving end: ```typescript function verifyWebhook(payload: string, signature: string, secret: string): boolean { const expected = crypto .createHmac('sha256', secret) .update(payload) .digest('hex'); return crypto.timingSafeEqual( Buffer.from(signature), Buffer.from(expected) ); } ``` ## Password Security - Passwords are hashed with **bcrypt** (cost factor 10) - Password reset tokens are single-use with expiration - The setup endpoint is disabled after the first admin account is created ## Production Security Checklist - [ ] Set strong, unique `JWT_SECRET` or use managed signing keys - [ ] Enable TLS for all database and Redis connections - [ ] Set `NODE_ENV=production` - [ ] Configure CORS to restrict allowed origins - [ ] Enable MFA for all admin accounts - [ ] Rotate signing keys periodically - [ ] Use separate database credentials for the platform DB and CDC target DB - [ ] Review role assignments — apply principle of least privilege - [ ] Monitor webhook delivery logs for unauthorized access attempts --- ## Admin UI Authentication **URL:** https://docs.realtime.dev/docs/admin-ui/authentication **Description:** Login flow, MFA, session management, RBAC, and user invite system. The Admin UI uses session-based authentication with support for multi-factor authentication (MFA), role-based access control (RBAC), and a user invite system. ## Login Flow ## First-Time Setup When no users exist in the system, the UI redirects to `/setup`: 1. Enter your name, email, and password 2. The system creates the first admin account with `super_admin` role 3. You're automatically logged in ## Session Management - Sessions are stored in PostgreSQL with the user ID, token, and expiration - Session tokens are automatically attached to every API request - Logout invalidates the session server-side - Expired sessions redirect to the login page ## Role-Based Access Control Four built-in roles with hierarchical permissions: | Role | Topics | Schemas | Mappings | Webhooks | Users | Keys | |------|--------|---------|----------|----------|-------|------| | **super_admin** | CRUD | CRUD | CRUD | CRUD | CRUD | CRUD | | **admin** | CRUD | CRUD | CRUD | CRUD | CRUD | Read | | **editor** | CRUD | CRUD | CRUD | CRUD | Read | — | | **viewer** | Read | Read | Read | Read | — | — | ### Custom Roles Create custom roles with granular permissions via the **Roles** page (`/roles`). The permission matrix lets you check/uncheck specific resource × action combinations. ### Permission Resources - `topics`, `schemas`, `mappings`, `webhooks` - `users`, `roles`, `invites` - `keys`, `deployments` - `debugger`, `documents`, `socket` ### Permission Actions - `read`, `create`, `update`, `delete` ## Multi-Factor Authentication (MFA) MFA uses TOTP (Time-based One-Time Password) compatible with Google Authenticator, Authy, 1Password, etc. ### Setting Up MFA 1. Go to your user profile or have an admin navigate to **Security → Users** 2. Click **Setup MFA** 3. Scan the QR code with your authenticator app 4. Enter the 6-digit verification code to confirm 5. MFA is now active on your account ### Disabling MFA Admins can disable MFA for any user via the Users page. Users can also disable their own MFA through the profile settings. ## Invite System Admins can invite new users via email: ### Invite Statuses | Status | Description | |--------|-------------| | `pending` | Invite sent, not yet accepted | | `accepted` | User created their account | | `revoked` | Admin revoked the invite | | `expired` | Invite link expired | ## Password Reset 1. Click **Forgot Password** on the login page 2. Enter your email address 3. A reset token is generated (in production, this would be emailed) 4. Use the reset link to set a new password --- ## Admin UI Overview **URL:** https://docs.realtime.dev/docs/admin-ui/overview **Description:** React management dashboard for the Realtime Platform with 17+ pages for configuration and monitoring. The Admin UI is a React single-page application built with Material UI (MUI) that provides full graphical management of the Realtime Platform. No raw JSON editing required for standard workflows. ## Accessing the Admin UI **Development:** `http://localhost:5173` (Vite dev server) **Environment Variable:** `VITE_API_URL` — Backend API base URL (default: `http://localhost:3000`) ## First-Time Setup On first launch, the UI detects that no users exist and redirects to the **Setup** page where you create the initial admin account. ## Global Controls ### Application Selector The AppBar includes an **application selector** that scopes all data to the selected application. Every API call includes the `X-Application-Id` header automatically. ### Environment Selector A color-coded **environment badge** (Development / Staging / Production) persists across page navigations. Environment-aware pages respond to the current selection. | Environment | Color | |------------|-------| | Development | Blue | | Staging | Orange | | Production | Green | ### User Menu The AppBar displays the current user's name with a logout option. ## Navigation Sections The sidebar is organized into seven sections: ## Technology Stack | Technology | Purpose | |-----------|---------| | **React** | UI framework | | **Material UI (MUI)** | Component library | | **React Router** | Client-side routing | | **Vite** | Build tool and dev server | ## Running the Admin UI ```bash # Development (with hot reload) pnpm --filter @realtime/admin-ui dev # Production build pnpm --filter @realtime/admin-ui build # Serve from apps/admin-ui/dist/ ``` --- ## Admin UI Pages **URL:** https://docs.realtime.dev/docs/admin-ui/pages **Description:** Detailed guide to every page in the Realtime Platform Admin UI. The Admin UI contains 17+ pages organized into seven navigation sections. This guide covers every page and its capabilities. ## Overview ### Dashboard (`/`) Three metric panels showing real-time platform health: - **Realtime Delivery** — Active WebSocket connections, reconnect rate, events in/out per second, fanout latency p50/p95/p99 - **Database Pipeline** — CDC lag, outbox lag, events captured/routed per second, mapping and schema failures - **Async Reliability** — BullMQ queue depth, retry counts, DLQ size, webhook success/failure rates Plus a **Hot Topics** table showing the most active topics with events/sec, subscriber counts, and p95 fanout latency. ### Monitoring (`/monitoring`) Overall system health and individual service statuses. Shows backend API, workers, Redis, and PostgreSQL connectivity. ### Alert Center (`/alerts`) System alerts filterable by severity. Environment-aware — shows alerts scoped to the currently selected environment. ## Registry ### Topics (`/topics`) Full CRUD for the topic registry: - Create topics with name, description, and owner via a dialog form - View all topics in a data table with status indicators - Edit topic metadata inline - Delete topics with confirmation ### Schemas (`/schemas`) Schema version browser and registration: - Browse schema versions by topic in a searchable list - Register new schemas with a JSON editor and compatibility mode selector - View schema diffs between versions - Validate payloads against the latest schema with an inline validation tool - Environment and application-scoped ### Mappings (`/mappings`) Database-to-topic mapping editor: - Create mappings with table, trigger columns, payload field mapping, and conditions - Visual rule builder for mapping conditions (no raw JSON required) - Version history for each mapping - Promote mappings between environments - Preview the emitted event payload ### Environment Grid (`/environments`) Multi-environment overview of all mappings: - Grid view showing Development, Staging, and Production columns - Color-coded sync status (green = synced, yellow = newer version available, red = missing) - Batch promotion across environments - Click any cell to see version details ### Deployments (`/deployments`) Deployment management for promoting configuration across environments: - Create deployments with entity picker (topics, mappings, webhooks) - Split-panel UI: deployment history (left) and detail panel (right) - Generate change reports showing what will be modified - Execute and rollback deployments - Inline comment system for audit trail ## Services ### Document Browser (`/documents`) Browse and manage sync documents by registered service: - **Service selector** — dropdown populated from registered services (via Services API) - **Manage Services** — gear icon opens inline CRUD dialog for creating, editing, and deleting service namespaces - View all documents for the selected service with data, revision, and timestamps - Edit document data with a JSON editor - Delete documents - Auto-loads documents when a service is selected - Scoped by current application and environment ### Document Inspector (`/documents/inspect`) Inspect and modify individual documents: - View full document JSON with syntax highlighting - Merge partial updates into documents - View revision history ### Socket Monitor (`/socket`) Three-tab WebSocket inspection dashboard: - **Connections** — expandable table of all active connections showing: transport (Socket.IO/WebSocket), remote address, auth subject, channel count, subscription count (with `returnMode` if set), and uptime. Expand to see user agent, joined channels, and full subscription details. - **Channels** — list all active channels with member counts, inspect members, and broadcast events from the UI - **Live Traffic** — SSE-powered real-time event stream showing connect/disconnect, subscribe/unsubscribe, and broadcast events with pause/resume and clear controls ## Webhooks ### Endpoints (`/webhooks`) Webhook endpoint management: - Add new endpoints with URL, secret, and event type selection - Activate/deactivate endpoints - Delete endpoints - View endpoint status and configuration ### Delivery Logs (`/webhooks/logs`) Webhook delivery history: - Filter logs by event type - View delivery status, response codes, and retry counts - Inspect delivery payloads ## Debugging ### Event Debugger (`/debugger`) Event trace timeline and routing inspection: - Timeline view of all events with topic filtering - Click any event to see routing details (subscribers matched, filters applied, delivery status) - Replay dialog for re-processing events ### Replay Tool (`/replay`) Replay historical events: - Load events from the trace store - Select replay mode (dry-run, router, websocket, webhook) - Apply overrides to the event payload before replay - View replay results ## Dashboards ### Realtime Delivery (`/dashboards/delivery`) Dedicated real-time delivery metrics: - Active connections over time - Events per second by source - Fanout latency percentiles - Hot topics breakdown ### Database Pipeline (`/dashboards/pipeline`) Database event pipeline metrics: - CDC lag and outbox lag - Events captured vs routed per second - Mapping failure rate - Schema validation failure rate ### Async Reliability (`/dashboards/reliability`) Worker and webhook reliability: - BullMQ queue depth over time - Retry counts and dead-letter queue size - Webhook success/failure rates - Worker heartbeat status ## Security ### Signing Keys (`/keys`) JWT signing key management: - View all keys with status (active/inactive) - Create new signing keys - Rotate keys (creates new + deactivates old atomically) - Deactivate keys ### Users (`/users`) User account management: - CRUD table for user accounts - Role assignment dialogs - MFA management (setup, disable) - Password management ### Roles (`/roles`) Role and permission management: - View built-in roles (super_admin, admin, editor, viewer) - Create custom roles - Permission matrix editor (resource × action checkbox grid) ### Invites (`/invites`) User invite management: - Create invites with email and role selection - Copy invite links - Track invite status (pending, accepted, revoked) - Revoke and delete invites --- ## Admin API **URL:** https://docs.realtime.dev/docs/api/admin **Description:** Signing key management, user administration, roles, and invites. The Admin API provides endpoints for managing JWT signing keys, users, roles, and invites. These endpoints require admin-level permissions. ## Signing Keys ### List Keys ```bash GET /api/admin/keys ``` **Response:** ```json [ { "keyId": "key_abc123", "algorithm": "HS256", "active": true, "createdAt": 1710000000000 } ] ``` ### Create Key ```bash POST /api/admin/keys ``` Generates a new signing key pair. The secret is returned only once. ### Deactivate Key ```bash POST /api/admin/keys/:id/deactivate ``` Deactivated keys can no longer sign new tokens but existing tokens remain valid until expiration. ### Rotate Key ```bash POST /api/admin/keys/:id/rotate ``` Creates a new key and deactivates the old one in a single atomic operation. ## Users ### List Users ```bash GET /api/users ``` ### Get User ```bash GET /api/users/:id ``` ### Create User ```bash POST /api/users ``` **Request Body:** ```json { "name": "Jane Smith", "email": "jane@example.com", "password": "secure-password" } ``` ### Update User ```bash PATCH /api/users/:id ``` ### Delete User ```bash DELETE /api/users/:id ``` ### Change Password ```bash POST /api/users/:id/change-password ``` ### MFA Management ```bash POST /api/users/:id/mfa/setup # Start MFA setup (returns QR code) POST /api/users/:id/mfa/confirm # Confirm MFA with TOTP code POST /api/users/:id/mfa/disable # Disable MFA ``` ### Role Assignments ```bash POST /api/users/:id/roles # Assign role DELETE /api/users/:id/roles/:roleId # Remove role ``` ## Roles ### List Roles ```bash GET /api/roles ``` **Built-in Roles:** | Role | Description | |------|-------------| | `super_admin` | Full system access | | `admin` | Manage topics, schemas, mappings, users | | `editor` | Create and modify topics, schemas, mappings | | `viewer` | Read-only access | ### Create Custom Role ```bash POST /api/roles ``` **Request Body:** ```json { "name": "webhook_manager", "description": "Can manage webhook endpoints", "permissions": [ { "resource": "webhooks", "action": "read" }, { "resource": "webhooks", "action": "create" }, { "resource": "webhooks", "action": "update" }, { "resource": "webhooks", "action": "delete" } ] } ``` ### Update / Delete Role ```bash PATCH /api/roles/:id DELETE /api/roles/:id ``` ## Invites ### List Invites ```bash GET /api/invites ``` ### Create Invite ```bash POST /api/invites ``` **Request Body:** ```json { "email": "new-user@example.com", "roleId": "role_editor" } ``` ### Accept Invite ```bash POST /api/invites/accept ``` **Request Body:** ```json { "token": "invite_token_abc123", "name": "New User", "password": "secure-password" } ``` ### Revoke / Delete Invite ```bash POST /api/invites/:id/revoke DELETE /api/invites/:id ``` ### Get Invite by Token ```bash GET /api/invites/by-token/:token ``` Used by the invite acceptance page to display invite details. --- ## Auth API **URL:** https://docs.realtime.dev/docs/api/auth-endpoints **Description:** Token issuance, verification, user authentication, session management, and MFA. The Auth API handles JWT token operations and user authentication with session-based login, MFA, password management, and first-time setup. ## JWT Token Endpoints ### Issue Token ```bash POST /auth/token ``` **Request Body:** ```json { "applicationId": "app_abc123", "permissions": ["subscribe", "publish"] } ``` **Response:** ```json { "token": "eyJhbGciOiJIUzI1NiIs...", "expiresAt": 1710003600000 } ``` ### Verify Token ```bash POST /auth/verify ``` **Request Body:** ```json { "token": "eyJhbGciOiJIUzI1NiIs..." } ``` **Response (valid):** ```json { "valid": true, "claims": { "sub": "user123", "aud": "app_abc123", "exp": 1710003600, "permissions": ["subscribe", "publish"] } } ``` **Response (invalid):** `401 Unauthorized` ## User Authentication ### Login ```bash POST /api/auth/login ``` **Request Body:** ```json { "email": "admin@example.com", "password": "your-password" } ``` **Response (no MFA):** ```json { "sessionToken": "sess_abc123...", "user": { "id": "usr_abc", "email": "admin@example.com", "name": "Admin" } } ``` **Response (MFA required):** ```json { "mfaRequired": true, "mfaToken": "mfa_pending_abc123" } ``` ### MFA Verify ```bash POST /api/auth/mfa/verify ``` **Request Body:** ```json { "mfaToken": "mfa_pending_abc123", "code": "123456" } ``` ### Logout ```bash POST /api/auth/logout ``` Invalidates the current session. ### Get Current User ```bash GET /api/auth/me ``` Returns the currently authenticated user's profile. ## Password Management ### Forgot Password ```bash POST /api/auth/forgot-password ``` **Request Body:** ```json { "email": "admin@example.com" } ``` ### Reset Password ```bash POST /api/auth/reset-password ``` **Request Body:** ```json { "token": "reset_token_abc123", "password": "new-secure-password" } ``` ## First-Time Setup ### Check Setup Status ```bash GET /api/auth/setup-status ``` Returns whether the platform has been initialized with a first admin account. ```json { "setupRequired": true } ``` ### Create First Admin ```bash POST /api/auth/setup ``` **Request Body:** ```json { "name": "Admin User", "email": "admin@example.com", "password": "secure-password" } ``` --- ## Database API **URL:** https://docs.realtime.dev/docs/api/database **Description:** Manage CDC mappings, versions, promotions, and database subscriptions. The Database API manages domain mappings (table-to-topic configurations), mapping versions, environment promotion, and database subscriptions. ## Mappings ### List Mappings ```bash GET /api/database/mappings ``` Returns all mappings scoped to the current application and environment. ### Get Mapping ```bash GET /api/database/mappings/:id ``` ### Create Mapping ```bash POST /api/database/mappings ``` **Request Body:** ```json { "table": "sessions", "topic": "session.status", "events": ["insert", "update"], "triggerColumns": ["status", "ended_at"], "payload": { "sessionId": "$row.id", "status": "$row.status", "endedAt": "$row.ended_at" }, "when": { "or": [ { "changed": "status" }, { "changed": "ended_at" } ] }, "environment": "development" } ``` ### Update Mapping ```bash PATCH /api/database/mappings/:id ``` ### Delete Mapping ```bash DELETE /api/database/mappings/:id ``` ### Get Mappings by Topic ```bash GET /api/database/mappings/topic/:topic ``` ### Environment Status ```bash GET /api/database/mappings/status ``` Returns the promotion status of all mappings across environments. Used by the Environment Grid UI. **Response:** ```json [ { "id": "map_123", "table": "sessions", "topic": "session.status", "environments": { "development": { "version": 3, "active": true }, "staging": { "version": 2, "active": true }, "production": { "version": 1, "active": true } } } ] ``` ## Versions ### List Versions ```bash GET /api/database/mappings/:id/versions ``` ### Create Version Snapshot ```bash POST /api/database/mappings/:id/versions ``` **Request Body:** ```json { "environment": "development" } ``` ### Promote Mapping ```bash POST /api/database/mappings/:id/promote ``` Copies the source environment's latest version into the next environment (dev → staging, staging → prod). **Request Body:** ```json { "environment": "staging" } ``` ## Subscriptions ### List Subscriptions ```bash GET /api/database/subscriptions ``` ### Create Subscription ```bash POST /api/database/subscriptions ``` **Request Body:** ```json { "mappingId": "map_123" } ``` ### Activate / Deactivate ```bash POST /api/database/subscriptions/:id/activate POST /api/database/subscriptions/:id/deactivate ``` ### Delivery Logs ```bash GET /api/database/subscriptions/logs?event=database.update ``` --- ## Debugger API **URL:** https://docs.realtime.dev/docs/api/debugger **Description:** Query event traces, debug routing and filters, and replay historical events. The Debugger API provides tools for inspecting the event pipeline — trace events, debug routing decisions, test subscription filters, and replay historical events. ## Query Traces ```bash GET /api/debugger/traces?topic=session.status&source=database&limit=50&offset=0 ``` **Query Parameters:** | Param | Required | Description | |-------|----------|-------------| | `topic` | No | Filter by topic | | `source` | No | Filter by source (`database`, `sync`, `socket`) | | `limit` | No | Max results (default 50) | | `offset` | No | Pagination offset | **Response:** ```json [ { "traceId": "trace_abc123", "eventId": "evt_a1b2c3d4", "topic": "session.status", "source": "database", "type": "database.update", "payload": { "sessionId": "abc123", "status": "active" }, "timestamp": 1710000000000, "routingResult": { "subscriberCount": 3, "deliveredCount": 3, "filteredCount": 0 } } ] ``` ## Get Trace Detail ```bash GET /api/debugger/traces/:traceId ``` Returns the full trace including routing decisions, filter results, and delivery status. ## Debug Routing Test how an event would be routed without actually publishing it: ```bash POST /api/debugger/debug-routing ``` **Request Body:** ```json { "event": { "topic": "session.status", "source": "database", "type": "database.update", "payload": { "sessionId": "abc123", "status": "active" } } } ``` **Response:** ```json { "topic": "session.status", "activeSubscriptions": 5, "matchingSubscriptions": 3, "routingDetails": [ { "subscriptionId": "sub_1", "matched": true, "filter": { "campusId": 12 } }, { "subscriptionId": "sub_2", "matched": false, "filter": { "campusId": 99 } } ] } ``` ## Debug Filter Test a specific filter against an event: ```bash POST /api/debugger/debug-filter ``` **Request Body:** ```json { "event": { "topic": "session.status", "payload": { "sessionId": "abc123", "status": "active", "campusId": 12 } }, "filter": { "campusId": 12, "status": { "in": ["active", "pending"] } } } ``` **Response:** ```json { "matched": true, "details": { "campusId": { "matched": true, "expected": 12, "actual": 12 }, "status": { "matched": true, "expected": ["active", "pending"], "actual": "active" } } } ``` ## Replay Event Replay a historical event from the trace store: ```bash POST /api/debugger/replay ``` **Request Body:** ```json { "traceId": "trace_abc123", "mode": "dry-run", "overrides": { "payload": { "status": "ended" } } } ``` **Replay Modes:** | Mode | Description | |------|-------------| | `dry-run` | Simulate routing without delivery | | `router` | Re-route through the event router | | `websocket` | Re-deliver to WebSocket subscribers | | `webhook` | Re-deliver to webhook endpoints | --- ## API Overview **URL:** https://docs.realtime.dev/docs/api/overview **Description:** Complete REST API and WebSocket event reference for the Realtime Platform. The Realtime Platform exposes a REST API on the backend server (default port 3000) and a WebSocket gateway via Socket.IO on the same port. ## Base URL ```text http://localhost:3000 ``` All REST endpoints are prefixed with `/api/` except health and auth routes. ## Authentication Most API endpoints require authentication via one of: - **JWT Bearer Token** — `Authorization: Bearer ` header - **Session Token** — `Authorization: ` or `X-Session-Token: ` header The Admin UI uses session-based authentication. Service-to-service calls use JWT tokens. ## Application Scoping All entity-owning endpoints are scoped by application. The `X-Application-Id` header identifies which application's data to access: ```bash curl http://localhost:3000/api/topics \ -H 'Authorization: Bearer ' \ -H 'X-Application-Id: app_abc123' ``` The Admin UI auto-attaches this header from the currently selected application. ## Environment Scoping Many endpoints also respect the `X-Environment` header for environment-specific data: ```bash curl http://localhost:3000/api/schemas/session.status/latest \ -H 'X-Application-Id: app_abc123' \ -H 'X-Environment: staging' ``` ## Response Format All API responses return JSON. Successful responses return the resource directly. Errors return: ```json { "error": "NotFoundError", "message": "Topic not found: session.status" } ``` ## Common HTTP Status Codes | Code | Meaning | |------|---------| | `200` | Success | | `201` | Created | | `400` | Bad Request — invalid input | | `401` | Unauthorized — missing or invalid auth | | `403` | Forbidden — insufficient permissions | | `404` | Not Found | | `409` | Conflict — optimistic concurrency failure | | `500` | Internal Server Error | ## Health Check ```bash GET /health ``` Returns `{"status":"ok"}` — no authentication required. ## Metrics ```bash GET /api/metrics/prometheus ``` Returns Prometheus-format metrics for scraping. No authentication required. ## API Sections --- ## Schemas API **URL:** https://docs.realtime.dev/docs/api/schemas **Description:** Register versioned JSON Schemas, validate payloads, and check compatibility. The Schemas API manages versioned JSON Schema definitions for domain topics. Each topic can have multiple schema versions with backward compatibility enforcement. ## List Schema Versions ```bash GET /api/schemas/:topic/versions ``` Returns all schema versions for a topic, scoped by application and environment. **Example:** ```bash curl http://localhost:3000/api/schemas/session.status/versions ``` **Response:** ```json [ { "topic": "session.status", "version": 1, "schema": { "type": "object", "properties": { ... } }, "compatibilityMode": "backward", "createdAt": 1710000000000 }, { "topic": "session.status", "version": 2, "schema": { "type": "object", "properties": { ... } }, "compatibilityMode": "backward", "createdAt": 1710001000000 } ] ``` ## Get Specific Version ```bash GET /api/schemas/:topic/versions/:version ``` ```bash curl http://localhost:3000/api/schemas/session.status/versions/2 ``` ## Get Latest Schema ```bash GET /api/schemas/:topic/latest ``` Returns the most recent schema version for a topic. ## Register Schema ```bash POST /api/schemas/:topic ``` **Request Body:** ```json { "schema": { "type": "object", "properties": { "sessionId": { "type": "string" }, "status": { "type": "string" }, "campusId": { "type": "number" } }, "required": ["sessionId", "status"] }, "compatibilityMode": "backward" } ``` The version number is assigned automatically (increments from the latest version). **Compatibility Modes:** | Mode | Description | |------|-------------| | `backward` | New schema must be able to read data written by the previous version. Default. | | `none` | No compatibility check — any schema is accepted. | If backward compatibility fails, the registration is rejected with a detailed error message. **Response:** `201 Created` with the new schema definition. ## Validate Payload ```bash POST /api/schemas/:topic/validate ``` Validates a payload against the latest schema for a topic. **Request Body:** ```json { "payload": { "sessionId": "abc123", "status": "active", "campusId": 12 } } ``` **Response (valid):** ```json { "valid": true } ``` **Response (invalid):** ```json { "valid": false, "errors": [ { "path": "/sessionId", "message": "must be string" } ] } ``` ## Compatibility Checking When registering a new schema with `compatibilityMode: "backward"`, the system checks that: - No required fields are removed - No field types are changed in incompatible ways - Existing consumers can still read payloads produced by the new schema If the check fails, you receive a 400 error with details about which fields are incompatible. --- ## Services API **URL:** https://docs.realtime.dev/docs/api/services **Description:** CRUD operations for managing sync document service namespaces. The Services API manages registered service namespaces that own sync documents. Services provide formal identifiers for grouping related documents instead of ad-hoc string names. All endpoints are scoped by the `X-Application-Id` header. ## List Services ```bash GET /api/services ``` **Response:** `200 OK` ```json { "services": [ { "id": "a1b2c3d4-...", "name": "session", "description": "Session state documents", "applicationId": "app_abc123", "createdAt": "2024-01-15T10:00:00.000Z", "updatedAt": "2024-01-15T10:00:00.000Z" } ] } ``` ## Get Service ```bash GET /api/services/:id ``` **Response:** `200 OK` ```json { "id": "a1b2c3d4-...", "name": "session", "description": "Session state documents", "applicationId": "app_abc123", "createdAt": "2024-01-15T10:00:00.000Z", "updatedAt": "2024-01-15T10:00:00.000Z" } ``` ## Create Service ```bash POST /api/services ``` **Request Body:** ```json { "name": "session", "description": "Session state documents" } ``` | Field | Type | Required | Description | |-------|------|----------|-------------| | `name` | `string` | Yes | Unique service name. Must match `^[a-z][a-z0-9_-]*$`. | | `description` | `string` | No | Human-readable description. | **Response:** `201 Created` ```json { "id": "a1b2c3d4-...", "name": "session", "description": "Session state documents", "applicationId": "app_abc123", "createdAt": "2024-01-15T10:00:00.000Z", "updatedAt": "2024-01-15T10:00:00.000Z" } ``` ## Update Service ```bash PATCH /api/services/:id ``` **Request Body:** ```json { "name": "session-v2", "description": "Updated description" } ``` Both fields are optional — only included fields are updated. **Response:** `200 OK` — returns the updated service object. ## Delete Service ```bash DELETE /api/services/:id ``` **Response:** `204 No Content` ## Error Responses | Status | Meaning | |--------|---------| | `400` | Validation error — invalid name format | | `404` | Service not found | | `409` | Conflict — a service with that name already exists in this application | ## Service Name Rules Service names must: - Start with a lowercase letter - Contain only lowercase letters, digits, hyphens, and underscores - Be unique per application Valid examples: `session`, `order-tracking`, `chat_rooms` Invalid examples: `Session`, `123abc`, `my service` --- ## Socket API **URL:** https://docs.realtime.dev/docs/api/socket **Description:** Channel management, broadcasting, and connection status endpoints. The Socket API provides REST endpoints for managing WebSocket channels and broadcasting messages server-side. ## List Channels ```bash GET /api/socket/channels ``` Returns all currently active channels with member counts. **Response:** ```json [ { "name": "chat:room1", "memberCount": 5 }, { "name": "notifications:user123", "memberCount": 1 } ] ``` ## Channel Members ```bash GET /api/socket/channels/:channel/members ``` **Example:** ```bash curl http://localhost:3000/api/socket/channels/chat:room1/members ``` **Response:** ```json [ { "socketId": "abc123", "joinedAt": 1710000000000 }, { "socketId": "def456", "joinedAt": 1710000100000 } ] ``` ## Connection Count ```bash GET /api/socket/connections ``` **Response:** ```json { "count": 42 } ``` ## Broadcast Send a message to all members of a channel via REST (useful for server-side broadcasting): ```bash POST /api/socket/broadcast ``` **Request Body:** ```json { "channel": "chat:room1", "event": "message", "data": { "text": "Server announcement", "sender": "system" } } ``` ## WebSocket Events These events are used over the Socket.IO WebSocket connection (not REST): | Event | Direction | Payload | Description | |-------|-----------|---------|-------------| | `subscribe` | Client → Server | `{ topic, filter?, returnMode? }` | Subscribe to a domain topic. `returnMode` controls sync event payload shape (`'full'`, `'diff'`, or `'both'`). | | `unsubscribe` | Client → Server | `{ subscriptionId }` | Unsubscribe from a topic | | `event` | Server → Client | `RealtimeEvent` | Delivered event matching subscription | ### Plain WebSocket Protocol For plain WebSocket clients connected on `/ws`, the JSON message protocol is: **Client → Server:** ```json { "type": "subscribe", "topic": "sync.session.updated", "filter": {}, "returnMode": "diff" } ``` ```json { "type": "unsubscribe", "subscriptionId": "sub_abc123" } ``` ```json { "type": "ping" } ``` **Server → Client:** ```json { "type": "subscribed", "subscriptionId": "sub_abc123", "topic": "sync.session.updated" } ``` ```json { "type": "event", "topic": "sync.session.updated", "data": { ... } } ``` ```json { "type": "pong" } ``` --- ## Sync API **URL:** https://docs.realtime.dev/docs/api/sync **Description:** Document CRUD operations with revision tracking and optimistic concurrency. The Sync API provides RESTful document management with automatic revision tracking. Documents are stored in RedisJSON and organized by service namespace. ## Create Document ```bash POST /api/sync/:service/documents ``` **Request Body:** ```json { "id": "optional-custom-id", "data": { "status": "active", "participantCount": 0 }, "expiresAt": 1710100000000 } ``` All fields except `data` are optional. If `id` is omitted, a unique prefixed ID is generated automatically. **Response:** `201 Created` ```json { "service": "session", "id": "doc_a1b2c3d4", "revision": 1, "data": { "status": "active", "participantCount": 0 }, "createdAt": 1710000000000, "updatedAt": 1710000000000 } ``` ## Get Document ```bash GET /api/sync/:service/documents/:id ``` **Response:** ```json { "service": "session", "id": "doc_a1b2c3d4", "revision": 3, "data": { "status": "active", "participantCount": 5 }, "createdAt": 1710000000000, "updatedAt": 1710001000000 } ``` ## Replace Document Replace the entire document data. Increments revision. ```bash PUT /api/sync/:service/documents/:id ``` **Request Body:** ```json { "data": { "status": "ended", "endedAt": 1710002000000 }, "expectedRevision": 3 } ``` The optional `expectedRevision` enables optimistic concurrency — if the document's current revision doesn't match, a `409 Conflict` is returned. ## Merge Document Deep merge partial data into the existing document. ```bash PATCH /api/sync/:service/documents/:id/merge ``` **Request Body:** ```json { "data": { "notes": "Session completed" }, "expectedRevision": 3 } ``` Only the specified fields are updated; all other fields remain unchanged. ## Mutate Document Apply structured mutations to the document. ```bash POST /api/sync/:service/documents/:id/mutate ``` **Request Body:** ```json { "mutations": [ { "op": "set", "path": "/status", "value": "ended" }, { "op": "increment", "path": "/participantCount", "value": -1 } ], "expectedRevision": 4 } ``` ## Delete Document Soft-delete a document (sets `deleted: true`). ```bash DELETE /api/sync/:service/documents/:id ``` **Response:** `200 OK` ## Error Responses | Status | Meaning | |--------|---------| | `404` | Document not found | | `409` | Revision conflict (expectedRevision mismatch) | | `400` | Invalid request body | --- ## Topics API **URL:** https://docs.realtime.dev/docs/api/topics **Description:** Create, read, update, and delete domain topics in the Topic Registry. The Topics API manages domain topics — the named channels that clients subscribe to for receiving real-time events. ## List Topics ```bash GET /api/topics ``` Returns all topics scoped to the current application and environment. **Response:** ```json [ { "name": "session.status", "description": "Session status changes", "owner": "platform-team", "activeSchemaVersion": 2, "applicationId": "app_abc123", "environment": "development", "createdAt": 1710000000000 } ] ``` ## Get Topic ```bash GET /api/topics/:name ``` **Example:** ```bash curl http://localhost:3000/api/topics/session.status ``` ## Create Topic ```bash POST /api/topics ``` **Request Body:** ```json { "name": "session.status", "description": "Session status changes", "owner": "platform-team" } ``` **Validation Rules:** - `name` — required, dot-separated, lowercase (e.g. `session.status`, `order.updated`) - `description` — required, non-empty string - `owner` — required, identifies the owning team or service **Response:** `201 Created` with the new topic object. ## Update Topic ```bash PATCH /api/topics/:name ``` **Request Body** (all fields optional): ```json { "description": "Updated description", "owner": "new-team", "activeSchemaVersion": 3 } ``` ## Delete Topic ```bash DELETE /api/topics/:name ``` **Response:** `200 OK` ## Topic Naming Convention Topics use dot-separated, lowercase names that follow a domain-driven convention: ```text session.status order.updated chat.room.message proctor.session.started user.profile.changed ``` The `TopicValidator` enforces this format — names must match the pattern: lowercase letters, numbers, and dots, with at least one segment. --- ## Webhooks API **URL:** https://docs.realtime.dev/docs/api/webhooks **Description:** Manage webhook endpoints, delivery logs, and HMAC-signed event dispatch. The Webhooks API manages HTTP callback endpoints that receive real-time events. Webhooks are HMAC-signed for security and support automatic retries with exponential backoff. ## List Endpoints ```bash GET /api/webhooks ``` Returns all webhook endpoints scoped to the current application. **Response:** ```json [ { "id": "wh_abc123", "url": "https://example.com/webhook", "events": ["database.event", "document.updated"], "active": true, "applicationId": "app_abc123", "createdAt": 1710000000000 } ] ``` ## Create Endpoint ```bash POST /api/webhooks ``` **Request Body:** ```json { "url": "https://example.com/webhook", "secret": "your-webhook-secret", "events": ["database.event", "document.updated"] } ``` | Field | Required | Description | |-------|----------|-------------| | `url` | Yes | HTTPS endpoint URL | | `secret` | Yes | Shared secret for HMAC-SHA256 signing | | `events` | Yes | Array of event types to deliver | **Supported Event Types:** - `document.updated` — Sync document updated - `document.deleted` — Sync document deleted - `presence.updated` — Presence change - `socket.message` — Socket message - `socket.connect` — Client connected - `socket.disconnect` — Client disconnected - `database.event` — Database CDC event ## Activate / Deactivate ```bash POST /api/webhooks/:id/activate POST /api/webhooks/:id/deactivate ``` Deactivated endpoints stop receiving deliveries but are not deleted. ## Delete Endpoint ```bash DELETE /api/webhooks/:id ``` ## Delivery Logs ```bash GET /api/webhooks/logs?event=database.event ``` Returns the delivery history with status, response codes, and retry counts. **Response:** ```json [ { "id": "log_abc123", "webhookId": "wh_abc123", "event": "database.event", "url": "https://example.com/webhook", "statusCode": 200, "success": true, "attempts": 1, "deliveredAt": 1710000100000 } ] ``` ## Webhook Signing Every webhook delivery includes an HMAC-SHA256 signature for verification. The following headers are sent with each request: | Header | Description | |--------|-------------| | `X-Webhook-Signature` | Signature in `t=,v1=` format | | `X-Webhook-Timestamp` | Signing timestamp in milliseconds | | `X-Webhook-Endpoint-Id` | The endpoint ID that matched this delivery | | `Content-Type` | `application/json` | The signed content is `${timestamp}.${rawBody}` — the timestamp concatenated with a dot and the raw JSON body. Including the timestamp prevents replay attacks. ### Using the SDK (Recommended) The SDK includes a built-in webhook receiver that handles signature verification, replay protection, and payload parsing. See the [Webhook Receiver SDK guide](/docs/sdk/webhook-receiver) for full details. ```typescript const receiver = new WebhookReceiver('whsec_your_secret'); app.post('/webhooks', express.text({ type: '*/*' }), (req, res) => { try { const { event } = receiver.verify(req.body, req.headers); console.log('Verified event:', event.topic, event.type); res.sendStatus(200); } catch (err) { res.sendStatus(400); } }); ``` ### Manual Verification If you cannot use the SDK, verify signatures manually: ```typescript function verifyWebhook( rawBody: string, signatureHeader: string, secret: string, toleranceMs = 300_000 ): boolean { // Parse header: t=,v1= const parts = signatureHeader.split(','); const tPart = parts.find((p) => p.startsWith('t=')); const vPart = parts.find((p) => p.startsWith('v1=')); if (!tPart || !vPart) return false; const timestamp = parseInt(tPart.slice(2), 10); const receivedSig = vPart.slice(3); // Reject expired timestamps (replay protection) if (Math.abs(Date.now() - timestamp) > toleranceMs) return false; // Compute expected signature const expected = createHmac('sha256', secret) .update(`${timestamp}.${rawBody}`) .digest('hex'); // Constant-time comparison if (expected.length !== receivedSig.length) return false; return timingSafeEqual(Buffer.from(expected), Buffer.from(receivedSig)); } ``` ## Retry Policy Failed deliveries are retried automatically using BullMQ: - **Max retries:** 5 - **Backoff:** Exponential with jitter - **Dead-letter queue:** Failed deliveries after max retries are moved to a DLQ for manual inspection ## Delivery Payload ```json { "id": "evt_a1b2c3d4", "topic": "session.status", "source": "database", "type": "database.update", "payload": { "sessionId": "abc123", "status": "active" }, "metadata": { "timestamp": 1710000000000, "table": "sessions", "operation": "update" } } ``` --- ## Deployment **URL:** https://docs.realtime.dev/docs/operations/deployment **Description:** Deploy the Realtime Platform to production with Docker, process managers, or cloud services. This guide covers deploying the Realtime Platform to production environments. ## Build for Production ```bash pnpm build ``` Turborepo builds all packages in dependency order. The output is: | App | Output | Type | |-----|--------|------| | Backend | `apps/backend/dist/` | Node.js server | | Workers | `apps/workers/dist/` | Node.js processes | | Admin UI | `apps/admin-ui/dist/` | Static files | ## Running in Production ### Backend API ```bash NODE_ENV=production pnpm --filter @realtime/backend start ``` The backend starts the Express HTTP server with Socket.IO on the configured `PORT` (default 3000). Migrations run automatically on startup. ### Workers ```bash NODE_ENV=production pnpm --filter @realtime/workers start ``` Workers run as a long-lived Node.js process. They connect to Redis (BullMQ) and optionally to the CDC target database. ### Admin UI The Admin UI builds to static files in `apps/admin-ui/dist/`. Serve with any static file server: Set `VITE_API_URL` at build time to point to your production backend: ```bash VITE_API_URL=https://api.yourplatform.com pnpm --filter @realtime/admin-ui build ``` ## Docker Deployment ### Example Dockerfile (Backend) ```dockerfile FROM node:20-alpine AS builder WORKDIR /app COPY . . RUN npm install -g pnpm@9 RUN pnpm install --frozen-lockfile RUN pnpm build FROM node:20-alpine WORKDIR /app COPY --from=builder /app . ENV NODE_ENV=production EXPOSE 3000 CMD ["node", "apps/backend/dist/index.js"] ``` ### Example Dockerfile (Workers) ```dockerfile FROM node:20-alpine AS builder WORKDIR /app COPY . . RUN npm install -g pnpm@9 RUN pnpm install --frozen-lockfile RUN pnpm build FROM node:20-alpine WORKDIR /app COPY --from=builder /app . ENV NODE_ENV=production CMD ["node", "apps/workers/dist/index.js"] ``` ## Infrastructure Requirements | Service | Production Recommendation | |---------|--------------------------| | **PostgreSQL** | Managed service (RDS, Cloud SQL) with connection pooling | | **Redis** | Managed service (ElastiCache, Redis Cloud) with RedisJSON | | **Backend** | 2+ instances behind a load balancer | | **Workers** | 1+ instances (stateless, horizontally scalable) | ## Environment Variables Ensure all required environment variables are set in production: ```bash NODE_ENV=production PORT=3000 LOG_LEVEL=info DATABASE_URL=postgresql://user:pass@db-host:5432/realtime REDIS_URL=redis://user:pass@redis-host:6379 CDC_DATABASE_URL=postgresql://user:pass@app-db:5432/yourapp ``` ## Health Checks Configure your load balancer or orchestrator to use the health endpoint: ```bash GET /health # Returns {"status":"ok"} with 200 ``` ## Horizontal Scaling The platform is designed for horizontal scaling: - **Backend instances** share state through Redis and PostgreSQL — add more behind a load balancer - **Workers** are stateless and process jobs from BullMQ — add more to increase throughput - **Socket.IO** uses Redis Pub/Sub adapter for cross-instance message delivery - **Sticky sessions** are NOT required — Socket.IO handles reconnection across instances ## SSL/TLS In production, terminate TLS at the load balancer or reverse proxy. Configure Redis and PostgreSQL connections with TLS: ```bash REDIS_URL=rediss://user:pass@redis-host:6379 DATABASE_URL=postgresql://user:pass@db-host:5432/realtime?sslmode=require ``` --- ## Database Migrations **URL:** https://docs.realtime.dev/docs/operations/migrations **Description:** Knex migration system, auto-migration on startup, manual commands, and writing new migrations. The Realtime Platform uses **Knex** for database migrations, managed in the `@realtime/database` package. Migrations run automatically on backend startup and can also be run manually. ## Auto-Migration on Startup The backend calls `runMigrations()` before starting the HTTP server. This: 1. Creates the `knex_migrations` tracking table if it doesn't exist 2. Applies any pending migrations in order 3. Skips already-applied migrations (idempotent) 4. Logs which migrations were applied If the database is unreachable, the backend logs a warning and starts anyway. ## Manual Commands ```bash # Apply all pending migrations pnpm --filter @realtime/database migrate:latest # View migration status pnpm --filter @realtime/database migrate:status # Rollback the last batch pnpm --filter @realtime/database migrate:rollback # Create a new migration file pnpm --filter @realtime/database migrate:make add_new_table ``` ## Migration Files Located at `packages/database/src/migrations/`: | Migration | Description | |-----------|-------------| | `20240101000001_event_outbox` | Transactional outbox table with partial index on unprocessed rows | | `20240101000002_signing_keys` | JWT signing key storage with rotation support | | `20240101000003_topics` | Topic registry table | | `20240101000004_schemas` | Versioned schema definitions with topic FK and compatibility mode | | `20240101000005_mappings` | Domain mappings and mapping version promotion tables | | `20240101000006_webhooks` | Webhook endpoints and delivery log with indexed history | | `20240101000007_event_traces` | Event trace storage for the debugger | | `20240101000008_applications` | Multi-tenant applications table | | `20240101000009_add_application_id` | Adds `application_id` column to all entity tables | | `20240101000010_environments` | Environment-scoped fields | | `20240101000011_deployments` | Deployment management tables (deployments, items, comments) | | `20240101000012_user_auth` | Users, roles, permissions, sessions, invites, password reset tokens | Each migration has `up()` and `down()` functions for forward and rollback. ## Writing a New Migration ## Knex Configuration The Knex config (`packages/database/src/knexfile.ts`) reads connection details from environment variables: **Connection priority:** 1. `DATABASE_URL` (full connection string) 2. Individual `POSTGRES_*` variables **Connection pool:** min 2, max 10 ## Best Practices - Always implement both `up()` and `down()` for every migration - Use `createTableIfNotExists` / `dropTableIfExists` for safety - Never modify a migration that has already been applied to production - Create a new migration for schema changes instead - Test migrations with `migrate:latest` followed by `migrate:rollback` to verify both directions work --- ## Monitoring **URL:** https://docs.realtime.dev/docs/operations/monitoring **Description:** Prometheus metrics, health checks, operational dashboards, and alerting. The Realtime Platform provides comprehensive monitoring through Prometheus-compatible metrics, health endpoints, and built-in operational dashboards. ## Health Check ```bash GET /health ``` Returns `{"status":"ok"}` with HTTP 200. No authentication required. Use this for load balancer and orchestrator health probes. ## Prometheus Metrics ```bash GET /api/metrics/prometheus ``` Returns metrics in Prometheus text format for scraping. Configure your Prometheus instance to scrape this endpoint. ### Available Metrics | Metric | Type | Description | |--------|------|-------------| | `realtime_events_received_total` | Counter | Total events received by source and topic | | `realtime_events_delivered_total` | Counter | Total events delivered to subscribers | | `realtime_events_filtered_total` | Counter | Total events filtered out by subscription filters | | `realtime_broadcast_latency_seconds` | Histogram | Event broadcast latency (p50, p95, p99) | | `realtime_websocket_connections` | Gauge | Current active WebSocket connections | | `realtime_queue_depth` | Gauge | BullMQ queue depth by queue name | | `realtime_webhook_deliveries_total` | Counter | Webhook delivery attempts by status | | `realtime_cdc_lag_seconds` | Gauge | CDC replication lag | ### Prometheus Configuration ```yaml scrape_configs: - job_name: 'realtime' scrape_interval: 15s static_configs: - targets: ['localhost:3000'] metrics_path: '/api/metrics/prometheus' ``` ## Dashboard Metrics API The platform provides pre-aggregated dashboard metrics: ```bash GET /api/metrics/dashboard ``` Returns structured metrics for the three operational dashboards: ```json { "delivery": { "activeConnections": 42, "reconnectRate": 0.02, "eventsInPerSec": 150, "eventsOutPerSec": 145, "fanoutLatencyP50": 12, "fanoutLatencyP95": 45, "fanoutLatencyP99": 89 }, "pipeline": { "cdcLag": 250, "outboxLag": 100, "eventsCapturedPerSec": 50, "eventsRoutedPerSec": 48, "mappingFailures": 0, "schemaValidationFailures": 0 }, "reliability": { "queueDepth": 5, "retryCount": 2, "dlqSize": 0, "webhookSuccessRate": 0.99, "webhookFailureRate": 0.01 } } ``` ## Hot Topics ```bash GET /api/metrics/hot-topics ``` Returns the most active topics with real-time statistics: ```json [ { "topic": "session.status", "eventsPerSec": 25.5, "subscriberCount": 150, "p95FanoutLatency": 32 } ] ``` ## Admin UI Dashboards The Admin UI provides three dedicated monitoring dashboards: ### Realtime Delivery (`/dashboards/delivery`) - Active WebSocket connections over time - Events per second by source (database, sync, socket) - Fanout latency percentiles - Hot topics breakdown - Dropped broadcasts and duplicate suppression ### Database Pipeline (`/dashboards/pipeline`) - CDC lag and outbox lag - Events captured vs routed per second - Mapping failure rate - Schema validation failure rate - Filter drop percentage ### Async Reliability (`/dashboards/reliability`) - BullMQ queue depth over time - Retry counts and dead-letter queue size - Webhook success/failure rates - Worker heartbeat status ## Alerting The Alert Center (`/alerts`) in the Admin UI displays system alerts: - Configurable alert rules with severity levels (info, warning, critical) - Cooldown periods to prevent alert storms - Environment-scoped alerts - Alert history for audit trails ## Logging All services use **pino** structured JSON logging: ```typescript const logger = getLogger(); logger.info({ topic: 'session.status', eventId: 'evt_123' }, 'Event routed'); logger.error({ err, webhookId: 'wh_456' }, 'Webhook delivery failed'); ``` ### Log Levels | Level | `LOG_LEVEL` | Description | |-------|-------------|-------------| | `debug` | `debug` | Verbose debugging (development only) | | `info` | `info` | Standard operational messages (default) | | `warn` | `warn` | Warnings that don't affect functionality | | `error` | `error` | Errors requiring attention | Set `LOG_LEVEL` in your environment configuration. --- ## Testing **URL:** https://docs.realtime.dev/docs/operations/testing **Description:** Test suite overview, running tests, writing new tests, and coverage across all packages. The Realtime Platform has 221+ automated tests across all 12 packages, run with **Vitest**. ## Running Tests ```bash # All tests across the monorepo pnpm test # Single package pnpm --filter @realtime/backend test # Watch mode pnpm --filter @realtime/event-router test:watch # With coverage pnpm --filter @realtime/auth test -- --coverage ``` ## Test Breakdown | Package | Tests | Description | |---------|-------|-------------| | `shared-utils` | 20 | Date, error, and object utility tests | | `shared-config` | 2 | Environment validation tests | | `observability` | 3 | Logger tests | | `redis-layer` | 7 | Key pattern tests | | `auth` | 9 | JWT, permissions tests | | `event-router` | 11 | Filters, normalizer, subscription registry | | `topic-registry` | 19 | Service CRUD, validation, environment scoping | | `schema-registry` | 16 | Service, compatibility checker, environment scoping | | `metrics` | 11 | Dashboard metrics, hot topics analyzer | | `workers` | 30 | Mapping evaluator, webhook signer/registry, CDC reader | | `backend` | 76 | API routes, document service, debugger, trace store | | `sdk` | 17 | RealtimeClient, EventBus | | **Total** | **221+** | | ## Test Architecture Tests use **dependency injection** throughout — all stores and services are injected via interfaces, so tests use in-memory implementations without requiring PostgreSQL or Redis. ```typescript // Example: testing the topics API route const store = new InMemoryTopicStore(); const service = new TopicRegistryService(store); const app = createApp({ topicService: service, /* ... */ }); // Test against the Express app directly const response = await request(app) .post('/api/topics') .send({ name: 'test.topic', description: 'Test', owner: 'team' }); expect(response.status).toBe(201); ``` ## Writing New Tests ## Configuration The root `vitest.config.ts` provides shared configuration for all packages: - **Test framework:** Vitest - **Coverage provider:** v8 - **File pattern:** `**/*.test.ts` ## Best Practices - Use in-memory store implementations for unit tests (no external dependencies) - Test both success and error paths - Test environment/application scoping for all entity operations - Use `beforeEach` to reset state between tests - Keep tests focused — one assertion concept per test --- ## Workers **URL:** https://docs.realtime.dev/docs/operations/workers **Description:** Background processors for CDC reading, outbox processing, webhook delivery, analytics, and alerts. The Workers app (`apps/workers`) runs background processors that handle asynchronous tasks via BullMQ queues and PostgreSQL CDC streaming. ## Starting Workers ```bash # Development (with hot reload) pnpm --filter @realtime/workers dev # Production pnpm --filter @realtime/workers start ``` Workers start automatically as part of `pnpm dev` via Turborepo. ## Worker Processes ### CDC Reader Monitors your application database for row-level changes using PostgreSQL logical replication. | Config | Default | Description | |--------|---------|-------------| | `CDC_DATABASE_URL` | — | Target application database | | `CDC_POLL_INTERVAL_MS` | `1000` | Polling interval | | `CDC_SLOT_NAME` | `realtime_cdc_slot` | Replication slot name | **Flow:** ```text PostgreSQL WAL → Logical Replication Slot → CDC Reader → Mapping Evaluator → Event Router ``` If `CDC_DATABASE_URL` is not configured, the CDC reader is disabled but all other workers run normally. ### Outbox Worker Processes the transactional outbox table (`event_outbox`) for guaranteed event delivery. **Flow:** ```text event_outbox table → Outbox Worker → Event Router → Redis Pub/Sub → Clients ``` The outbox table uses a partial index on unprocessed rows for efficient polling. Processed rows are marked and cleaned up periodically. ### Mapping Evaluator Evaluates domain mappings against CDC events: 1. Receives a raw database change event 2. Looks up matching mappings by table name and operation type 3. Evaluates trigger column conditions (`changed`, `eq`, `and`, `or`) 4. Builds the event payload using `$row.*` template references 5. Forwards the normalized event to the Event Router ### Webhook Delivery Worker Delivers events to registered webhook endpoints via BullMQ: - **HMAC-SHA256 signing** of every payload - **Exponential backoff** with jitter on failure - **Max 5 retries** per delivery - **Dead-letter queue** for permanently failed deliveries - Delivery logs recorded for every attempt ### Analytics Worker Buffers incoming events and periodically flushes aggregated analytics: - Event counts by topic and source - Latency percentiles - Subscriber activity ### Alert Worker Evaluates alert rules against incoming events and metrics: - Configurable alert rules with severity levels - Cooldown periods to prevent alert storms - Alert history for audit trails ## Queue Architecture All worker queues use **BullMQ** backed by Redis: ```text ┌──────────────────────┐ │ Redis │ │ ┌────────────────┐ │ │ │ webhook-delivery│ │ │ │ alerts │ │ │ │ analytics-events│ │ │ │ report-generation│ │ │ └────────────────┘ │ └──────────┬───────────┘ │ ┌──────▼──────┐ │ Workers │ │ Process │ │ jobs from │ │ queues │ └─────────────┘ ``` ## Scaling Workers Workers are stateless and horizontally scalable: - Run multiple worker instances to increase throughput - BullMQ handles job distribution and deduplication - Each worker instance processes jobs from all queues - No coordination required between instances ## Monitoring Workers - Check worker logs for startup messages and processing activity - Use the **Async Reliability** dashboard in the Admin UI for queue metrics - Monitor BullMQ queue depth, retry counts, and DLQ size - Worker heartbeat status is visible in the Monitoring page ## Troubleshooting --- ## SDK Authentication **URL:** https://docs.realtime.dev/docs/sdk/authentication **Description:** Authenticate SDK connections with JWT tokens, handle token refresh, and manage session lifecycle. The SDK authenticates via JWT tokens passed during client initialization. The token is used for both HTTP API calls and WebSocket connections. ## Connecting with a Token ```typescript const realtime = new RealtimeClient({ url: 'http://localhost:3000', token: 'your-jwt-token', }); ``` The token is attached to the WebSocket handshake and all subsequent HTTP requests. ## Obtaining a Token Tokens are issued by the platform's auth endpoint: ```bash curl -X POST http://localhost:3000/auth/token \ -H 'Content-Type: application/json' \ -d '{ "applicationId": "your-app-id", "permissions": ["subscribe", "publish"] }' ``` The response contains a signed JWT: ```json { "token": "eyJhbGciOiJIUzI1NiIs...", "expiresAt": 1710003600000 } ``` ## Token Structure JWT tokens contain: | Claim | Description | |-------|-------------| | `sub` | Subject (user or service ID) | | `aud` | Audience (application ID) | | `iss` | Issuer | | `exp` | Expiration timestamp | | `iat` | Issued-at timestamp | | `kid` | Key ID (for key rotation) | | `permissions` | Array of granted permissions | ## Token Helpers The SDK includes utility functions for token inspection: ```typescript const decoded = decodeToken(token); console.log(decoded.sub); // User ID if (isTokenExpired(token)) { console.log('Token has expired'); } const expiresIn = getTokenExpiresIn(token); console.log(`Expires in ${expiresIn}ms`); if (shouldRefreshToken(token)) { // Token is close to expiring, refresh it } ``` ## Permission Roles Tokens can carry one of three permission levels: | Role | Capabilities | |------|-------------| | `admin` | Full access — manage topics, schemas, keys, users | | `service` | Publish events, manage documents, broadcast | | `client` | Subscribe to topics, read documents | ## Error Handling ```typescript realtime.on('error', (err) => { if (err.code === 'AUTH_FAILED') { console.log('Authentication failed — token may be expired or invalid'); // Obtain a new token and reconnect } }); ``` ## Verifying a Token Server-side token verification: ```bash curl -X POST http://localhost:3000/auth/verify \ -H 'Content-Type: application/json' \ -d '{ "token": "eyJhbGciOiJIUzI1NiIs..." }' ``` Returns the decoded claims if valid, or a 401 error if invalid/expired. --- ## SDK Overview **URL:** https://docs.realtime.dev/docs/sdk/overview **Description:** Client SDK for browser and Node.js — subscribe to topics, manage documents, and send messages. The Realtime Platform SDK (`@smarterservices/realtime`) provides a unified client for all three platform services: domain subscriptions, sync documents, and socket messaging. ## Installation ```bash npm install @smarterservices/realtime ``` ## Quick Start ```typescript const realtime = new RealtimeClient({ url: 'http://localhost:3000', token: 'your-jwt-token', }); // Subscribe to a domain topic const subId = realtime.subscribe({ topic: 'session.status' }); // Listen for events realtime.on('event', (event) => { console.log('Received:', event.topic, event.payload); }); // Topic-specific listener realtime.on('event:session.status', (event) => { console.log('Session update:', event.payload); }); ``` ## Constructor ```typescript new RealtimeClient(options: RealtimeClientOptions) ``` ### `RealtimeClientOptions` | Option | Type | Required | Default | Description | |--------|------|----------|---------|-------------| | `url` | `string` | Yes | — | Backend API base URL (e.g. `"http://localhost:3000"`) | | `token` | `string` | Yes | — | JWT authentication token used for the WebSocket handshake and API requests | | `autoConnect` | `boolean` | No | `true` | If `true`, the client connects immediately on construction. Set to `false` to connect manually via `connect()` | | `reconnection` | `boolean` | No | `true` | Enable automatic reconnection on disconnect | | `reconnectionAttempts` | `number` | No | `10` | Maximum number of reconnection attempts before giving up | | `reconnectionDelay` | `number` | No | `1000` | Base delay in milliseconds between reconnection attempts | ```typescript const realtime = new RealtimeClient({ url: 'http://localhost:3000', token: 'your-jwt-token', autoConnect: false, // Don't connect until we call connect() reconnection: true, reconnectionAttempts: 20, reconnectionDelay: 2000, }); realtime.connect(); // Connect manually ``` ## Service Accessors The client exposes two sub-clients as read-only properties: | Property | Type | Description | |----------|------|-------------| | `realtime.sync` | `SyncClient` | Document CRUD with revision tracking — see [Sync Client](/docs/sdk/sync-client) | | `realtime.socketClient` | `SocketClient` | Channel messaging and direct messages — see [Socket Client](/docs/sdk/socket-client) | ## Methods ### `connect()` Open the WebSocket connection. Called automatically unless `autoConnect` is `false`. ```typescript connect(): void ``` **Returns:** `void` — sets the connection state to `'connecting'`. ```typescript realtime.connect(); ``` --- ### `disconnect()` Close the WebSocket connection. Does not clear subscriptions — they are re-established on reconnect. ```typescript disconnect(): void ``` **Returns:** `void` — sets the connection state to `'disconnected'`. ```typescript realtime.disconnect(); ``` --- ### `subscribe(options)` Subscribe to a domain topic. Events matching the topic (and optional filter) are delivered to your event listeners. Subscriptions are automatically re-established after reconnection. ```typescript subscribe(options: SubscribeOptions): string ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `options` | `SubscribeOptions` | Yes | Subscription configuration object (see below) | **`SubscribeOptions`:** | Field | Type | Required | Description | |-------|------|----------|-------------| | `topic` | `string` | Yes | Domain topic to subscribe to (e.g. `"session.status"`, `"order.updated"`) | | `filter` | `SubscriptionFilter` | No | Key-value filter — only events whose payload matches the filter are delivered. Supports equality and `in` operators | **Returns:** `string` — a unique subscription ID (e.g. `"sub_1"`) used to unsubscribe later. ```typescript // Simple subscription const subId = realtime.subscribe({ topic: 'session.status' }); // With a filter — only receive events where campusId = 12 const filtered = realtime.subscribe({ topic: 'session.status', filter: { campusId: 12 }, }); ``` --- ### `unsubscribe(subscriptionId)` Remove a subscription by its ID. The server stops delivering events for this subscription. ```typescript unsubscribe(subscriptionId: string): void ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `subscriptionId` | `string` | Yes | The subscription ID returned by `subscribe()` | **Returns:** `void` ```typescript realtime.unsubscribe('sub_1'); ``` --- ### `on(event, handler)` Register an event listener. ```typescript on(event: string, handler: EventHandler): void ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `event` | `string` | Yes | Event name to listen for (see event names below) | | `handler` | `EventHandler` | Yes | Callback function `(data: unknown) => void` | **Event Names:** | Event | Payload | Description | |-------|---------|-------------| | `'event'` | `RealtimeEvent` | Fires for every event from any subscribed topic | | `'event:'` | `RealtimeEvent` | Fires only for events matching the specific topic (e.g. `'event:session.status'`) | | `'connection'` | `{ state: ConnectionState, reason?: string }` | Fires on connection state changes | | `'error'` | `unknown` | Fires on connection or protocol errors | ```typescript realtime.on('event:session.status', (event) => { console.log(event.payload); }); realtime.on('connection', ({ state }) => { console.log('State:', state); // 'connecting' | 'connected' | 'disconnected' | 'reconnecting' }); realtime.on('error', (err) => { console.error('Error:', err); }); ``` --- ### `off(event, handler)` Remove a previously registered event listener. ```typescript off(event: string, handler: EventHandler): void ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `event` | `string` | Yes | Event name to stop listening for | | `handler` | `EventHandler` | Yes | The exact same function reference passed to `on()` | **Returns:** `void` ```typescript const handler = (event) => console.log(event); realtime.on('event:session.status', handler); // Later... realtime.off('event:session.status', handler); ``` --- ### `destroy()` Disconnect, clear all subscriptions, and remove all event listeners. Call this when you're done with the client to release resources. ```typescript destroy(): void ``` **Returns:** `void` ```typescript realtime.destroy(); ``` ## Read-Only Properties | Property | Type | Description | |----------|------|-------------| | `realtime.state` | `ConnectionState` | Current connection state: `'disconnected'`, `'connecting'`, `'connected'`, or `'reconnecting'` | | `realtime.connected` | `boolean` | `true` if the connection state is `'connected'` | ```typescript if (realtime.connected) { console.log('We are live'); } console.log('Current state:', realtime.state); ``` ## Event Envelope: `RealtimeEvent` All events received by the client follow the `RealtimeEvent` envelope: ```typescript interface RealtimeEvent { id: string; // Unique event ID topic: string; // Domain topic (e.g. "session.status") source: 'database' | 'sync' | 'socket'; // Which service produced the event type: string; // Event type (e.g. "database.update") payload: Record; // Event data metadata: { timestamp: number; // Unix timestamp (ms) table?: string; // Source table (database events) operation?: 'insert' | 'update' | 'delete'; // DB operation type revision?: number; // Document revision (sync events) }; } ``` ## Server-Side Utilities The SDK also includes utilities for server-side use: | Export | Description | |--------|-------------| | `WebhookReceiver` | Verify and parse incoming webhook deliveries with HMAC-SHA256 signature validation and replay protection — see [Webhook Receiver](/docs/sdk/webhook-receiver) | | `verifyWebhook()` | Standalone function for one-off webhook verification | | `WebhookVerificationError` | Typed error thrown when webhook verification fails | ```typescript const receiver = new WebhookReceiver('whsec_your_secret'); const { event } = receiver.verify(rawBody, headers); ``` ## TypeScript Support The SDK is written in TypeScript and ships with full type definitions. All public APIs are typed, including event payloads and options. ```typescript RealtimeClientOptions, SubscribeOptions, ConnectionState, WebhookReceiverOptions, WebhookEvent, } from '@smarterservices/realtime'; RealtimeEvent, SubscriptionFilter, } from '@realtime/shared-types'; ``` --- ## Socket Client **URL:** https://docs.realtime.dev/docs/sdk/socket-client **Description:** Channel-based messaging, broadcasting, and direct messages via WebSocket. The Socket Client provides raw WebSocket messaging through named channels. Join channels, publish messages, send direct messages, and track presence. Access the Socket Client via `realtime.socketClient`: ```typescript const socketClient = realtime.socketClient; ``` ## Methods ### `joinChannel(channel)` Join a named channel. You will receive events published to this channel by other clients. ```typescript joinChannel(channel: string): void ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `channel` | `string` | Yes | Channel name to join. Arbitrary string — you define the naming convention (e.g. `"chat:room1"`, `"notifications:user123"`) | **Returns:** `void` — the join is fire-and-forget; the server confirms via the underlying Socket.IO ack. ```typescript realtime.socketClient.joinChannel('chat:room1'); ``` --- ### `leaveChannel(channel)` Leave a previously joined channel. You will stop receiving events from this channel. ```typescript leaveChannel(channel: string): void ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `channel` | `string` | Yes | Channel name to leave | **Returns:** `void` ```typescript realtime.socketClient.leaveChannel('chat:room1'); ``` --- ### `publish(channel, event, data)` Send a message to all members of a channel. ```typescript publish(channel: string, event: string, data: unknown): void ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `channel` | `string` | Yes | Target channel name (e.g. `"chat:room1"`) | | `event` | `string` | Yes | Event name — a label for the type of message (e.g. `"message"`, `"typing"`, `"reaction"`) | | `data` | `unknown` | Yes | Event payload — any JSON-serializable value (object, string, number, array, etc.) | **Returns:** `void` — the publish is fire-and-forget. All members of the channel (except the sender) receive the event. ```typescript realtime.socketClient.publish('chat:room1', 'message', { text: 'Hello everyone!', sender: 'user123', timestamp: Date.now(), }); realtime.socketClient.publish('chat:room1', 'typing', { userId: 'user123', isTyping: true, }); ``` --- ### `directMessage(targetSocketId, event, data)` Send a message directly to a specific connected client identified by their socket ID. ```typescript directMessage(targetSocketId: string, event: string, data: unknown): void ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `targetSocketId` | `string` | Yes | The Socket.IO socket ID of the recipient client (e.g. `"abc123"`) | | `event` | `string` | Yes | Event name for the message (e.g. `"notification"`, `"invite"`) | | `data` | `unknown` | Yes | Message payload — any JSON-serializable value | **Returns:** `void` — only the targeted client receives the event. No channel membership is required. ```typescript realtime.socketClient.directMessage('abc123', 'notification', { text: 'You have a new message', from: 'user456', }); realtime.socketClient.directMessage(targetSocketId, 'invite', { roomId: 'chat:room2', invitedBy: currentUser.name, }); ``` ## Listening for Messages Use the `RealtimeClient`'s event system to listen for incoming socket events: ```typescript // Global listener — receives events from ALL subscribed topics realtime.on('event', (event) => { if (event.source === 'socket') { console.log('Channel:', event.topic); console.log('Data:', event.payload); } }); // Topic-specific listener realtime.subscribe({ topic: 'chat:room1' }); realtime.on('event:chat:room1', (event) => { console.log('Chat message:', event.payload); }); ``` ## Channel Naming Channel names are arbitrary strings. Common patterns: ```text chat:room1 # Chat room notifications:user123 # User notifications game:match5 # Game session dashboard:team-alpha # Team dashboard ``` ## Example: Chat Room ```typescript const realtime = new RealtimeClient({ url: 'http://localhost:3000', token: userToken, }); // Join the room realtime.socketClient.joinChannel('chat:room1'); // Subscribe to messages realtime.subscribe({ topic: 'chat:room1' }); realtime.on('event:chat:room1', (event) => { displayMessage(event.payload); }); // Send a message function sendMessage(text: string) { realtime.socketClient.publish('chat:room1', 'message', { text, sender: currentUser.id, timestamp: Date.now(), }); } // Leave when done function leaveRoom() { realtime.socketClient.leaveChannel('chat:room1'); } ``` --- ## Subscribing to Topics **URL:** https://docs.realtime.dev/docs/sdk/subscribing **Description:** Subscribe to domain topics with optional filters to receive real-time events. The subscription system is the primary way clients receive real-time events. You subscribe to **domain topics** and the platform delivers matching events from any source (database CDC, sync, or socket). ## `subscribe(options)` ```typescript subscribe(options: SubscribeOptions): string ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `options` | `SubscribeOptions` | Yes | Subscription configuration (see fields below) | **`SubscribeOptions` fields:** | Field | Type | Required | Description | |-------|------|----------|-------------| | `topic` | `string` | Yes | Domain topic name to subscribe to (e.g. `"session.status"`, `"order.updated"`) | | `filter` | `SubscriptionFilter` | No | Key-value filter object — only events whose payload matches every filter key are delivered to this client | | `returnMode` | `SyncEventReturnMode` | No | Controls what data is included in sync document change events: `'full'` (default), `'diff'`, or `'both'`. See [Return Mode](#return-mode) below. | **Returns:** `string` — a unique subscription ID (e.g. `"sub_1"`, `"sub_2"`) that you pass to `unsubscribe()` later. ```typescript const subId = realtime.subscribe({ topic: 'session.status', }); console.log(subId); // "sub_1" ``` ## Listening for Events ### `on('event', handler)` — Global Listener Receive all events across all active subscriptions: ```typescript realtime.on('event', (event: RealtimeEvent) => { console.log('Topic:', event.topic); // e.g. "session.status" console.log('Source:', event.source); // "database" | "sync" | "socket" console.log('Payload:', event.payload); // The actual event data }); ``` ### `on('event:', handler)` — Topic-Specific Listener Listen for events on a single topic only: ```typescript realtime.on('event:session.status', (event: RealtimeEvent) => { console.log('Session update:', event.payload); }); realtime.on('event:order.updated', (event: RealtimeEvent) => { console.log('Order update:', event.payload); }); ``` ## Subscription Filters Filters narrow down which events you receive **after** topic routing. Only events whose payload matches the filter are delivered to your client. ### `SubscriptionFilter` ```typescript type SubscriptionFilter = Record; ``` Filter values can be: - **Scalar** — exact equality match against the payload field - **`{ in: [...] }`** — match if the payload field equals any value in the array All filter keys use **AND** logic — every key must match for the event to be delivered. ### Equality Filter ```typescript realtime.subscribe({ topic: 'session.status', filter: { campusId: 12 }, }); // Only receives events where payload.campusId === 12 ``` ### Multiple Fields (AND) ```typescript realtime.subscribe({ topic: 'session.status', filter: { campusId: 12, status: 'active', }, }); // Both conditions must match ``` ### `in` Operator Match against multiple possible values for a single field: ```typescript realtime.subscribe({ topic: 'session.status', filter: { status: { in: ['pending', 'active'] }, }, }); // Receives events where status is 'pending' OR 'active' ``` ### Combined Filters ```typescript realtime.subscribe({ topic: 'session.status', filter: { campusId: 12, status: { in: ['pending', 'active'] }, }, }); // campusId must be 12 AND status must be 'pending' or 'active' ``` ## Return Mode When subscribing to sync document topics (e.g. `sync.session.updated`), you can control how much data each event carries over the wire using the `returnMode` option: | Mode | `payload.data` | `payload.diff` | Description | |------|:-:|:-:|---| | `'full'` (default) | ✅ | ❌ | Full document state — backward-compatible default | | `'diff'` | ❌ | ✅ | Only what changed — minimal wire traffic | | `'both'` | ✅ | ✅ | Full state + change context | ### Full Mode (Default) Receive the complete document data on every change. This is the default and requires no extra configuration: ```typescript realtime.subscribe({ topic: 'sync.session.updated' }); realtime.on('event:sync.session.updated', (event) => { console.log(event.payload.data); // Full document: { status: 'active', count: 5 } }); ``` ### Diff Mode Receive only the changes — ideal for large documents where you want to minimize bandwidth: ```typescript realtime.subscribe({ topic: 'sync.session.updated', returnMode: 'diff', }); realtime.on('event:sync.session.updated', (event) => { console.log(event.payload.diff); // { // added: { newField: 42 }, // removed: ['oldField'], // changed: { status: { from: 'active', to: 'ended' } } // } }); ``` ### Both Mode Receive both the full document and the diff: ```typescript realtime.subscribe({ topic: 'sync.session.updated', returnMode: 'both', }); realtime.on('event:sync.session.updated', (event) => { console.log(event.payload.data); // Full document console.log(event.payload.diff); // What changed }); ``` ### Diff Shape The `diff` object has the following structure: ```typescript interface JsonDiff { added: Record; // Keys present in new but not old removed: string[]; // Keys present in old but not new changed: Record; } ``` > **Note:** `returnMode` only affects sync-sourced events. Non-sync events (database CDC, socket) are delivered unchanged regardless of this setting. ## `unsubscribe(subscriptionId)` ```typescript unsubscribe(subscriptionId: string): void ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `subscriptionId` | `string` | Yes | The subscription ID returned by `subscribe()` | **Returns:** `void` — the server stops delivering events for this subscription. ```typescript const subId = realtime.subscribe({ topic: 'session.status' }); // Later, when you no longer need these events: realtime.unsubscribe(subId); ``` ## Multiple Subscriptions You can hold multiple active subscriptions simultaneously. Each returns its own `subId`: ```typescript const sub1 = realtime.subscribe({ topic: 'session.status' }); const sub2 = realtime.subscribe({ topic: 'order.updated' }); const sub3 = realtime.subscribe({ topic: 'chat.room.message', filter: { roomId: 'room1' } }); // All events arrive through the same listener realtime.on('event', (event) => { switch (event.topic) { case 'session.status': handleSession(event); break; case 'order.updated': handleOrder(event); break; case 'chat.room.message': handleChat(event); break; } }); // Unsubscribe individually realtime.unsubscribe(sub3); ``` ## Reconnection Behavior When the WebSocket connection drops and reconnects, all active subscriptions are **automatically re-established**. You don't need to re-subscribe manually. The client internally tracks all subscription IDs and options, and re-emits them to the server on reconnect. ```typescript realtime.on('connection', ({ state }) => { if (state === 'reconnecting') { console.log('Reconnecting... subscriptions will be restored'); } if (state === 'connected') { console.log('Connected — all subscriptions active'); } }); ``` ## Event Envelope Every event delivered to subscribers follows the standard `RealtimeEvent` envelope: ```typescript interface RealtimeEvent { id: string; // Unique event ID (e.g. "evt_a1b2c3d4") topic: string; // Domain topic (e.g. "session.status") source: 'database' | 'sync' | 'socket'; // Which service produced the event type: string; // Event type (e.g. "database.update") payload: Record; // The actual event data metadata: { timestamp: number; // Unix timestamp in milliseconds table?: string; // Source table name (database events only) operation?: 'insert' | 'update' | 'delete'; // DB operation (database events only) revision?: number; // Document revision (sync events only) }; } ``` --- ## Sync Client **URL:** https://docs.realtime.dev/docs/sdk/sync-client **Description:** Create, read, update, and delete real-time documents with revision tracking. The Sync Client provides CRUD operations on real-time documents stored in RedisJSON. Every mutation automatically increments the document revision and broadcasts changes to subscribers. Access the Sync Client via `realtime.sync`: ```typescript const syncClient = realtime.sync; ``` ## Methods ### `create(service, data, id?)` Create a new document in the given service namespace. ```typescript create(service: string, data: Record, id?: string): Promise ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `service` | `string` | Yes | Service namespace the document belongs to (e.g. `"session"`, `"order"`) | | `data` | `Record` | Yes | Initial document payload — any JSON-serializable object | | `id` | `string` | No | Custom document ID. If omitted, a prefixed unique ID is generated (e.g. `doc_a1b2c3d4`) | **Returns:** `Promise` — the created document with `id`, `revision` (1), `service`, and `data`. ```typescript // Auto-generated ID const doc = await realtime.sync.create('session', { status: 'active', participantCount: 0, }); console.log(doc.id); // "doc_a1b2c3d4" console.log(doc.revision); // 1 // Custom ID const doc2 = await realtime.sync.create('session', { status: 'active', }, 'my-custom-id'); console.log(doc2.id); // "my-custom-id" ``` --- ### `get(service, id)` Fetch the current state of a document. ```typescript get(service: string, id: string): Promise ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `service` | `string` | Yes | Service namespace (e.g. `"session"`) | | `id` | `string` | Yes | Document ID to retrieve | **Returns:** `Promise` — the document with its current `data`, `revision`, and metadata. **Throws:** Error if the document does not exist. ```typescript const doc = await realtime.sync.get('session', 'doc_a1b2c3d4'); console.log(doc.data.status); // "active" console.log(doc.revision); // 3 ``` --- ### `replace(service, id, data, expectedRevision?)` Replace the entire document data. All existing fields are overwritten. ```typescript replace(service: string, id: string, data: Record, expectedRevision?: number): Promise ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `service` | `string` | Yes | Service namespace | | `id` | `string` | Yes | Document ID to replace | | `data` | `Record` | Yes | New document payload — completely replaces existing data | | `expectedRevision` | `number` | No | If provided, the operation fails with a conflict error if the document's current revision doesn't match | **Returns:** `Promise` — the updated document with incremented `revision`. **Throws:** Conflict error (409) if `expectedRevision` doesn't match. ```typescript const updated = await realtime.sync.replace('session', 'doc_a1b2c3d4', { status: 'ended', endedAt: Date.now(), }); console.log(updated.revision); // 4 // With optimistic concurrency await realtime.sync.replace('session', 'doc_a1b2c3d4', newData, 4); ``` --- ### `merge(service, id, partial, expectedRevision?)` Deep merge partial data into the existing document. Only the specified fields are updated — all other fields remain unchanged. ```typescript merge(service: string, id: string, partial: Record, expectedRevision?: number): Promise ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `service` | `string` | Yes | Service namespace | | `id` | `string` | Yes | Document ID to merge into | | `partial` | `Record` | Yes | Partial data to deep merge into the document | | `expectedRevision` | `number` | No | Optimistic concurrency guard | **Returns:** `Promise` — the updated document with incremented `revision`. ```typescript await realtime.sync.merge('session', 'doc_a1b2c3d4', { notes: 'Session completed successfully', }); // Only 'notes' is added/updated; status, participantCount, etc. remain unchanged ``` --- ### `mutate(service, id, mutations, expectedRevision?)` Apply structured mutations to the document. Unlike `merge`, the mutations object is passed directly to the document service which applies them as discrete field-level operations in a single revision bump. ```typescript mutate(service: string, id: string, mutations: Record, expectedRevision?: number): Promise ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `service` | `string` | Yes | Service namespace | | `id` | `string` | Yes | Document ID to mutate | | `mutations` | `Record` | Yes | Mutation operations to apply atomically | | `expectedRevision` | `number` | No | Optimistic concurrency guard | **Returns:** `Promise` — the updated document with incremented `revision`. ```typescript await realtime.sync.mutate('session', 'doc_a1b2c3d4', { status: 'ended', participantCount: 0, endedAt: Date.now(), }, 5); ``` --- ### `delete(service, id)` Soft-delete a document (sets `deleted: true` on the document). ```typescript delete(service: string, id: string): Promise ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `service` | `string` | Yes | Service namespace | | `id` | `string` | Yes | Document ID to delete | **Returns:** `Promise` ```typescript await realtime.sync.delete('session', 'doc_a1b2c3d4'); ``` ## Return Type: `SyncDocument` All write operations return a `SyncDocument`: ```typescript interface SyncDocument { service: string; // Namespace (e.g. "session") id: string; // Document ID revision: number; // Auto-incrementing version number data: Record; // Document payload } ``` ## Optimistic Concurrency The `replace`, `merge`, and `mutate` methods accept an optional `expectedRevision` parameter. If the document's current revision doesn't match, the operation fails with a conflict error: ```typescript try { await realtime.sync.replace('session', docId, newData, 3); } catch (err) { if (err.message.includes('conflict')) { // Document was modified by another writer since revision 3 const latest = await realtime.sync.get('session', docId); // Retry with the latest revision } } ``` ## Listening for Document Changes Subscribe to sync events to receive real-time updates when any client modifies a document: ```typescript realtime.subscribe({ topic: 'sync.session.updated' }); realtime.on('event:sync.session.updated', (event) => { console.log('Operation:', event.metadata.operation); console.log('Revision:', event.metadata.revision); console.log('Data:', event.payload.data); }); ``` ### Receiving Diffs By default, events include the full document data. You can opt into receiving a **diff** of what changed by setting `returnMode` when subscribing: ```typescript // Only receive the diff — minimal wire traffic realtime.subscribe({ topic: 'sync.session.updated', returnMode: 'diff', }); realtime.on('event:sync.session.updated', (event) => { const { diff } = event.payload; console.log('Added fields:', diff.added); console.log('Removed fields:', diff.removed); console.log('Changed fields:', diff.changed); // changed: { status: { from: 'active', to: 'ended' } } }); ``` Use `returnMode: 'both'` to receive both the full document and the diff in every event. See [Subscribing — Return Mode](/docs/sdk/subscribing#return-mode) for the full reference. --- ## Webhook Receiver **URL:** https://docs.realtime.dev/docs/sdk/webhook-receiver **Description:** Verify and parse incoming webhook deliveries from the Realtime Platform using the SDK's built-in receiver. The SDK includes a server-side webhook receiver that validates HMAC-SHA256 signatures, checks for replay attacks, and returns a typed `RealtimeEvent` you can use directly in your application. ## Quick Start ```typescript const app = express(); const receiver = new WebhookReceiver('whsec_your_endpoint_secret'); // Use raw body parsing so the signature can be verified against the exact bytes app.post('/webhooks/realtime', express.text({ type: '*/*' }), (req, res) => { try { const { event } = receiver.verify(req.body, req.headers); console.log('Received:', event.topic, event.type, event.payload); res.sendStatus(200); } catch (err) { console.error('Verification failed:', err.message); res.sendStatus(400); } }); app.listen(4100); ``` ## `WebhookReceiver` The class-based receiver. Create one instance per webhook endpoint secret and call `verify()` on every incoming request. ### Constructor ```typescript new WebhookReceiver(secret: string, options?: WebhookReceiverOptions) ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `secret` | `string` | Yes | The shared secret for this webhook endpoint, as configured when creating the endpoint via the [Webhooks API](/docs/api/webhooks) | | `options` | `WebhookReceiverOptions` | No | Optional configuration (see below) | **`WebhookReceiverOptions`:** | Field | Type | Default | Description | |-------|------|---------|-------------| | `toleranceMs` | `number` | `300000` (5 min) | Maximum age in milliseconds for a webhook timestamp to be considered valid. Requests older than this are rejected as potential replay attacks | ```typescript // Default: 5-minute tolerance const receiver = new WebhookReceiver('whsec_my_secret'); // Custom: 30-second tolerance for stricter security const strict = new WebhookReceiver('whsec_my_secret', { toleranceMs: 30_000 }); ``` ### `verify(body, headers)` Verify a webhook request and return the parsed event. ```typescript verify( body: string, headers: Record ): WebhookEvent ``` | Parameter | Type | Required | Description | |-----------|------|----------|-------------| | `body` | `string` | Yes | The raw request body string. Must be the exact bytes received — do not re-serialize or parse before verification | | `headers` | `Record` | Yes | HTTP headers object. In Express this is `req.headers`. The receiver looks for `X-Webhook-Signature` (case-insensitive) | **Returns:** `WebhookEvent` ```typescript interface WebhookEvent { /** The verified and parsed RealtimeEvent */ event: RealtimeEvent; /** Timestamp (ms) from the signature header */ timestamp: number; /** The raw body string that was verified */ rawBody: string; } ``` **Throws:** `WebhookVerificationError` if verification fails (see [Error Handling](#error-handling) below). ```typescript const { event, timestamp, rawBody } = receiver.verify(req.body, req.headers); console.log(event.id); // "evt_a1b2c3d4" console.log(event.topic); // "session.status" console.log(event.source); // "database" console.log(event.type); // "row.update" console.log(event.payload); // { sessionId: "abc123", status: "active" } ``` ## `verifyWebhook()` (Standalone Function) A convenience function for one-off verification without instantiating a class: ```typescript const { event } = verifyWebhook(body, headers, 'whsec_my_secret'); ``` ```typescript verifyWebhook( body: string, headers: Record, secret: string, options?: WebhookReceiverOptions ): WebhookEvent ``` ## Error Handling All verification failures throw a `WebhookVerificationError` with a machine-readable `code` property: ```typescript try { const { event } = receiver.verify(body, headers); // Process event... } catch (err) { if (err instanceof WebhookVerificationError) { console.error(`Webhook rejected [${err.code}]: ${err.message}`); } } ``` | Code | Meaning | |------|---------| | `MISSING_SIGNATURE` | The `X-Webhook-Signature` header was not found in the request | | `INVALID_SIGNATURE_FORMAT` | The header is not in the expected `t=,v1=` format | | `TIMESTAMP_EXPIRED` | The signature timestamp is outside the tolerance window (replay protection) | | `SIGNATURE_MISMATCH` | The computed HMAC does not match — the secret is wrong or the body was tampered with | | `INVALID_PAYLOAD` | The body is not valid JSON or is missing required `RealtimeEvent` fields (`id`, `topic`) | ## Signature Format The platform signs every webhook delivery with HMAC-SHA256. The signature is sent in the `X-Webhook-Signature` header using this format: ``` t=,v1= ``` The signed content is `${timestamp}.${rawBody}` — the timestamp concatenated with a dot and the raw JSON body. Including the timestamp in the signed content prevents replay attacks even if an attacker captures a valid signature. **Headers sent with each delivery:** | Header | Description | |--------|-------------| | `X-Webhook-Signature` | Signature in `t=,v1=` format | | `X-Webhook-Timestamp` | Signing timestamp (milliseconds) | | `Content-Type` | `application/json` | ## Framework Examples ### Express ```typescript const app = express(); const receiver = new WebhookReceiver(process.env.WEBHOOK_SECRET!); // IMPORTANT: Use raw/text body parsing — not JSON parsing — // so the signature can be verified against the original bytes. app.post( '/webhooks/realtime', express.text({ type: 'application/json' }), (req, res) => { try { const { event } = receiver.verify(req.body, req.headers); switch (event.topic) { case 'session.status': handleSessionUpdate(event.payload); break; case 'order.created': handleNewOrder(event.payload); break; default: console.log('Unhandled topic:', event.topic); } res.sendStatus(200); } catch (err) { console.error('Webhook error:', err); res.sendStatus(400); } } ); ``` ### Fastify ```typescript const fastify = Fastify(); const receiver = new WebhookReceiver(process.env.WEBHOOK_SECRET!); // Fastify provides the raw body via request.body when using // content-type-parser for text fastify.addContentTypeParser( 'application/json', { parseAs: 'string' }, (req, body, done) => done(null, body) ); fastify.post('/webhooks/realtime', async (request, reply) => { try { const { event } = receiver.verify( request.body as string, request.headers as Record ); console.log('Event:', event.topic, event.payload); return reply.code(200).send({ ok: true }); } catch { return reply.code(400).send({ error: 'Invalid webhook' }); } }); ``` ### Next.js API Route (App Router) ```typescript const receiver = new WebhookReceiver(process.env.WEBHOOK_SECRET!); export async function POST(req: NextRequest) { const body = await req.text(); const headers: Record = {}; req.headers.forEach((value, key) => { headers[key] = value; }); try { const { event } = receiver.verify(body, headers); console.log('Webhook event:', event.topic, event.type); return NextResponse.json({ received: true }); } catch (err) { if (err instanceof WebhookVerificationError) { return NextResponse.json({ error: err.code }, { status: 400 }); } return NextResponse.json({ error: 'Internal error' }, { status: 500 }); } } ``` ## Important: Raw Body Requirement The webhook signature is computed over the **exact raw bytes** of the request body. If your framework parses the body as JSON before your handler runs, you'll need to ensure the raw string is also available. Most frameworks provide a way to access the raw body: | Framework | Approach | |-----------|----------| | **Express** | Use `express.text({ type: 'application/json' })` middleware on the webhook route | | **Fastify** | Register a custom content-type parser with `parseAs: 'string'` | | **Next.js** | Use `await req.text()` in the App Router | | **Koa** | Use `koa-body` with `{ text: true }` or `ctx.request.rawBody` | ## TypeScript Types All types are exported from the SDK: ```typescript ``` --- ## Database CDC Service **URL:** https://docs.realtime.dev/docs/services/database **Description:** Stream PostgreSQL row-level changes to real-time subscribers using Change Data Capture. The Database Service streams changes from your **application database** to real-time subscribers using PostgreSQL Change Data Capture (CDC). Administrators define **domain mappings** that control which table changes produce events and how payloads are constructed. ## How It Works ```text PostgreSQL WAL → Logical Replication Slot → CDC Reader (worker) → Mapping Evaluator → Event Router → Redis Pub/Sub → Socket Gateway → Connected Clients ``` ## Two Databases | Connection | Configured In | Purpose | |-----------|--------------|----------| | **Platform DB** | `.env` — `DATABASE_URL` or `POSTGRES_*` | Realtime's own metadata (topics, schemas, mappings, keys) | | **Target DB** | **Application Settings** (per environment) | Your application database that the CDC reader monitors | ## Target Database Preparation Before the CDC reader can monitor your application database, the target database needs to be configured with the correct settings and user privileges. ## Connecting in Application Settings ## Domain Mappings Mappings define which table changes become realtime events. Tables do **not** automatically stream events — every mapping must be explicitly configured. ### Mapping Structure ```json { "id": "map_session_status_v1", "table": "sessions", "events": ["insert", "update"], "topic": "session.status", "triggerColumns": ["status", "ended_at"], "when": { "or": [ { "changed": "status" }, { "changed": "ended_at" } ] }, "payload": { "sessionId": "$row.id", "status": "$row.status", "endedAt": "$row.ended_at", "campusId": "$row.campus_id" } } ``` ### Configuration Fields | Field | Required | Description | |-------|----------|-------------| | `table` | Yes | Source table to monitor (use `schema.table` for non-public schemas, e.g. `salesforce.sp_exam_session__c`) | | `topic` | Yes | Target domain topic for events | | `events` | Yes | Operations to capture: `insert`, `update`, `delete` | | `triggerColumns` | No | Columns that trigger the event (update only fires if these change) | | `when` | No | Conditional rules (see below) | | `payload` | Yes | Payload template with `$row.*` column references | | `environment` | No | Environment scope (`development`, `staging`, `production`) | ### Creating Mappings ## Mapping Conditions Mappings support conditional rules to control when events fire: ### Changed Column Fire only when a specific column changes: ```json { "changed": "status" } ``` ### Equality Check Fire only when a column equals a specific value: ```json { "eq": ["$row.status", "active"] } ``` ### Logical Operators Combine conditions with `and` / `or`: ```json { "and": [ { "changed": "status" }, { "eq": ["$row.status", "active"] } ] } ``` ```json { "or": [ { "changed": "status" }, { "changed": "ended_at" } ] } ``` ## Payload Templates Payload fields use `$row.*` references to extract values from the changed row: ```json { "sessionId": "$row.id", "status": "$row.status", "endedAt": "$row.ended_at", "campusId": "$row.campus_id" } ``` The mapping evaluator resolves these references at runtime using the actual row data from the CDC event. ## Mapping Validation Mappings are validated against the target topic's schema: - Required fields must be present in the payload template - Field types must be compatible - Schema violations are reported inline in the Admin UI ## Environment Promotion Mappings support a three-stage promotion workflow: ```text Development → Staging → Production ``` ### Environment Grid The **Environment Grid** page (`/environments`) shows all mappings across all environments with color-coded sync indicators: - **Green** — in sync across environments - **Yellow** — newer version available for promotion - **Red** — missing in target environment ## Outbox Pattern In addition to CDC, the platform supports the **transactional outbox** pattern: ```text Application write → event_outbox table insert (in same transaction) → Outbox Worker polls for unprocessed rows → Event Router → Redis Pub/Sub → Clients ``` The `event_outbox` table uses a partial index on unprocessed rows for efficient polling. ## Verifying CDC 1. Check the **Database Pipeline** dashboard in the Admin UI — the reader state should show **Streaming** 2. Make a change in a monitored table 3. Look for `CDC event routed` log entries in the backend logs 4. Check the **Event History** tab in the Database Pipeline page to see captured events 5. Use the **Event Debugger** in the Admin UI to inspect routed events ## Troubleshooting | Error | Cause | Fix | |-------|-------|-----| | *No CDC database configured* | Application doesn't have CDC connection set for this environment | Go to Application Settings and configure the CDC database URL | | *must be owner of table* | CDC user doesn't have permission to manage publications on this table | Grant `SUPERUSER` or `pg_publication_owner` to the CDC user | | *relation does not exist* | Wrong schema — table has a schema prefix (e.g. `salesforce.my_table`) | Ensure the table name in your mapping includes the schema prefix | | *wal_level is not logical* | Target database hasn't been configured for logical replication | Set `wal_level = 'logical'` and restart PostgreSQL | | *replication slot already exists* | Slot from a previous session wasn't cleaned up | Drop it with `SELECT pg_drop_replication_slot('realtime_cdc_slot');` | ## API Reference | Method | Path | Description | |--------|------|-------------| | `GET` | `/api/database/mappings` | List all mappings | | `GET` | `/api/database/mappings/status` | Environment status for all mappings | | `GET` | `/api/database/mappings/:id` | Get mapping by ID | | `POST` | `/api/database/mappings` | Create mapping | | `PATCH` | `/api/database/mappings/:id` | Update mapping | | `DELETE` | `/api/database/mappings/:id` | Delete mapping | | `GET` | `/api/database/mappings/topic/:topic` | Get mappings by topic | | `GET` | `/api/database/mappings/:id/versions` | List versions | | `POST` | `/api/database/mappings/:id/versions` | Create version snapshot | | `POST` | `/api/database/mappings/:id/promote` | Promote to next environment | | `GET` | `/api/database/subscriptions` | List subscriptions | | `POST` | `/api/database/subscriptions` | Create subscription | | `POST` | `/api/database/subscriptions/:id/activate` | Activate | | `POST` | `/api/database/subscriptions/:id/deactivate` | Deactivate | --- ## Socket Service **URL:** https://docs.realtime.dev/docs/services/socket **Description:** Raw WebSocket messaging with channels, broadcasting, direct messaging, and presence. The Socket Service provides raw WebSocket messaging capabilities built on **Socket.IO**. It supports channel-based subscriptions, message broadcasting, direct messaging between clients, and presence tracking. ## Core Concepts ### Channels Channels are named groups that clients can join and leave. Messages published to a channel are delivered to all members. ```text chat:room1 notifications:user123 game:match5 ``` Channel names are arbitrary strings — you define the naming convention for your application. ### Connection Lifecycle ```text Client connects → Authentication → Join channels → Send/Receive → Disconnect ``` All WebSocket connections are authenticated via JWT tokens passed during the initial handshake. ## Usage ### Join a Channel ### Publish to a Channel Send a message to all members of a channel: ### Direct Messaging Send a message to a specific connected client by their socket ID: ```typescript realtime.socketClient.directMessage(targetSocketId, 'notification', { text: 'You have a new message', }); ``` ### Leave a Channel ```typescript realtime.socketClient.leaveChannel('chat:room1'); ``` ## Connection Events Listen for connection lifecycle events: ```typescript realtime.on('connection', ({ state }) => { console.log('Connection state:', state); // 'connecting' | 'connected' | 'disconnected' | 'reconnecting' }); realtime.on('error', (err) => { console.error('Connection error:', err); }); ``` Socket.IO handles automatic reconnection with exponential backoff. ## Presence The Socket Service tracks which clients are connected to each channel: - **Join** — presence event broadcast when a client joins a channel - **Leave** — presence event broadcast when a client leaves or disconnects - **Members** — query current members of a channel ### Query Channel Members ```bash curl http://localhost:3000/api/socket/channels/chat:room1/members ``` ### List Active Channels ```bash curl http://localhost:3000/api/socket/channels ``` ### Connection Count ```bash curl http://localhost:3000/api/socket/connections ``` ## REST API Reference | Method | Path | Description | |--------|------|-------------| | `GET` | `/api/socket/channels` | List all active channels | | `GET` | `/api/socket/channels/:channel/members` | List members in a channel | | `GET` | `/api/socket/connections` | Get total connection count | | `POST` | `/api/socket/broadcast` | Broadcast to channel | ## WebSocket Events | Event | Direction | Payload | Description | |-------|-----------|---------|-------------| | `subscribe` | Client → Server | `{ topic, filter? }` | Subscribe to a topic/channel | | `unsubscribe` | Client → Server | `{ subscriptionId }` | Unsubscribe | | `event` | Server → Client | `RealtimeEvent` | Delivered event | ## Admin UI The **Socket Channels** page (`/socket`) in the Admin UI provides: - List of all active channels with member counts - View members in each channel - Broadcast events to channels from the UI - Real-time connection metrics ## Architecture The Socket Gateway is integrated into the backend Express server via Socket.IO: ```text Client WebSocket → Socket.IO Server (backend) → Authentication middleware → Channel subscription manager → Redis Pub/Sub (cross-instance fanout) → Event delivery to connected clients ``` When running multiple backend instances, Redis Pub/Sub ensures that messages published on one instance are delivered to subscribers connected to other instances. --- ## Sync Service **URL:** https://docs.realtime.dev/docs/services/sync **Description:** Realtime document synchronization with revision tracking, partial updates, and presence. The Sync Service provides **Twilio Sync–style** realtime document state management. Documents are stored in Redis (RedisJSON) with automatic revision tracking, concurrent writer safety, and real-time change broadcasting. ## Core Concepts ### Documents A sync document is a JSON object identified by a service namespace and a unique ID. Every mutation increments the document's revision number, enabling optimistic concurrency control. ```typescript interface SyncDocumentEnvelope { service: string; // Namespace (e.g. "session") id: string; // Unique document ID revision: number; // Auto-incrementing version data: Record; // Document payload createdAt: number; // Unix timestamp updatedAt: number; // Unix timestamp deleted?: boolean; // Soft-delete flag expiresAt?: number; // Optional TTL } ``` ### Redis Storage Documents are stored in RedisJSON at the following key patterns: | Key Pattern | Purpose | |------------|---------| | `syncdoc:data:{service}:{id}` | Document data | | `syncdoc:channel:{service}:{id}` | Change notification channel | | `syncdoc:presence:{service}:{id}` | Presence tracking | ## Operations ### Create Create a new document with optional initial data: ### Read Fetch the current state of a document: ### Replace Replace the entire document data: ### Merge Apply a partial update (deep merge): ### Delete Soft-delete a document (sets `deleted: true`): ## Optimistic Concurrency All write operations support an optional `expectedRevision` parameter for optimistic concurrency control: ```typescript // Only succeeds if the document is still at revision 3 await realtime.sync.replace('session', docId, newData, { expectedRevision: 3, }); ``` If the current revision doesn't match, the operation returns a conflict error (HTTP 409). ## Mutation Flow Every document write follows this internal flow: ## Real-Time Subscriptions Clients receive document change events automatically when subscribed to the appropriate topic: ```typescript realtime.subscribe({ topic: 'sync.session.updated' }); realtime.on('event:sync.session.updated', (event) => { console.log('Document changed:', event.payload); // { service, documentId, revision, data } }); ``` ### Event Payload Every sync document change event includes these payload fields: | Field | Type | Description | |-------|------|-------------| | `service` | `string` | Service namespace (e.g. `"session"`) | | `documentId` | `string` | Document ID | | `revision` | `number` | New revision number after the change | | `data` | `object` | Full document data (included unless `returnMode` is `'diff'`) | | `diff` | `JsonDiff` | What changed (included when `returnMode` is `'diff'` or `'both'`) | ### Return Mode Subscribers can control how much data each event carries using the `returnMode` option at subscribe time: | Mode | `data` | `diff` | Use Case | |------|:-:|:-:|---| | `'full'` (default) | ✅ | ❌ | Full document state — backward-compatible | | `'diff'` | ❌ | ✅ | Minimal bandwidth — only what changed | | `'both'` | ✅ | ✅ | Full state + change context | ```typescript // Diff-only subscription — ideal for large documents realtime.subscribe({ topic: 'sync.session.updated', returnMode: 'diff', }); ``` The diff is computed server-side by comparing the previous and new document data. Its structure: ```typescript interface JsonDiff { added: Record; // New keys removed: string[]; // Deleted keys changed: Record; // Modified keys with before/after values } ``` For plain WebSocket clients, include `returnMode` in the subscribe message: ```json { "type": "subscribe", "topic": "sync.session.updated", "returnMode": "diff" } ``` ## Presence The Sync Service includes built-in presence tracking per document: - Track which users are currently viewing or editing a document - Presence data is stored in Redis and automatically cleaned up on disconnect - Presence updates are broadcast to all subscribers ## Admin UI Use the **Document Browser** (`/documents`) and **Document Inspector** (`/documents/inspect`) pages in the Admin UI to: - Browse documents by service namespace - View document data and revision history - Merge updates to documents - Delete documents --- ## Links - [GitHub](https://bitbucket.org/smarterservices/realtime) - [Support](mailto:engineering@smarterservices.com)