Event System

Diminuendo’s event system is the central nervous system of the gateway. Every agent action — streaming a text token, invoking a tool, requesting user permission, reporting token usage — flows through a structured pipeline that transforms raw Podium agent events into the gateway’s typed protocol, routes them through specialized handlers, persists the durable ones, and broadcasts them to connected clients. This page traces that pipeline from end to end.

Event Pipeline

  Podium Agent (WebSocket)
         |
         v
  PodiumEventMapper         Transforms 30+ agent events into gateway protocol
         |
         v
  Handler Dispatch           Routes by event type to 7 handler modules
         |
         v
  EventPublisher             Assigns seq, persists durable events
         |
         v
  Broadcaster                Bun native pub/sub to session/tenant topics
         |
         v
  Client WebSockets          SDK consumers, web UI, desktop app

PodiumEventMapper

The PodiumEventMapper is a pure function that transforms raw Podium agent events into the gateway’s own event protocol. A single Podium event may map to zero, one, or multiple gateway events.
export function mapPodiumEvent(
  sessionId: string,
  turnId: string,
  event: PodiumEvent,
): ClientEvent[]

Mapping Categories

The mapper handles events across six categories:
Maps tool.call_start, tool.call_delta, tool.call, tool.result, and tool.error from Podium into corresponding gateway events. Both messageType and content.event_type are checked, providing resilience against upstream format changes.
// Example: tool.call_start
if (msgType === "tool.call_start" || eventType === "tool.call_start") {
  return [{
    type: "tool_call_start",
    sessionId, turnId,
    toolCallId: content.tool_call_id ?? "",
    toolName: content.tool_name ?? "",
    seq, ts,
  }]
}
Maps tool.question_requested, tool.permission_requested, and tool.approval_resolved. These events drive the session state machine into the waiting state and back.
Maps thinking.start, thinking.progress (including the legacy thinking_update type), and thinking.complete. Empty thinking content is filtered — the mapper returns an empty array rather than emitting a no-op event.
Maps terminal.stream and terminal.complete for command execution output.
Maps sandbox.provisioning, sandbox.init, and sandbox.removed for sandbox lifecycle tracking.
Maps the core message flow: created / stream_start become turn_started; update / stream_update become text_delta; complete / stream_end / stream_complete become turn_complete; and error becomes turn_error. A fallback rule treats any unrecognized event with textual content as a text_delta.

Per-Session Sequence Numbers

Every event emitted by the mapper is assigned a monotonically increasing sequence number scoped to its session:
const sessionSeqCounters = new Map<string, number>()

function nextSeq(sessionId: string): number {
  const current = sessionSeqCounters.get(sessionId) ?? 0
  const next = current + 1
  sessionSeqCounters.set(sessionId, next)
  return next
}
Sequence numbers are per-session, not global. This enables clients to request events after a specific seq via the afterSeq parameter on join_session, and to detect gaps if events arrive out of order.
When a session is deleted, resetSessionSeq(sessionId) clears its counter. This prevents stale counters from accumulating memory for sessions that no longer exist.

Handler Decomposition

After the mapper produces gateway events, each event is routed by type to one of seven specialized handler modules. Every handler is a pure function with the signature (ctx: EventHandlerContext, event?) => Effect<void>:
ModuleEvents HandledResponsibility
message-completeturn_complete, turn_errorPersists accumulated text, transitions state, settles billing
tool-lifecycletool_call_start, tool_call, tool_result, tool_errorTracks pending/completed tool calls in ConnectionState
interactivequestion_requested, permission_requested, approval_resolvedManages deferred interactive messages, transitions to/from waiting
thinkingthinking_start, thinking_progress, thinking_completeTracks thinking state in ConnectionState refs
terminalterminal_stream, terminal_completeForwards terminal output (handled inline)
sandboxsandbox_provisioning, sandbox_ready, sandbox_removedTracks sandbox lifecycle
inlinetext_delta, session_stateText accumulation and session state transitions handled directly in the dispatcher

Dispatch Implementation

The dispatchToHandler function in MessageRouterLive.ts is a switch over the event type string:
const dispatchToHandler = (ctx: EventHandlerContext, clientEvent: ClientEvent, turnId: string, rawEvent: PodiumEvent) =>
  Effect.gen(function* () {
    switch (clientEvent.type) {
      case "turn_complete":
        yield* handleTurnComplete(ctx, turnId, rawEvent)
        break
      case "turn_error":
        yield* handleTurnError(ctx, turnId)
        break
      case "tool_call_start":
        yield* handleToolCallStart(ctx, clientEvent)
        break
      // ... remaining cases
      case "text_delta":
        if (typeof clientEvent.text === "string") {
          yield* Ref.update(ctx.cs.fullContent, (t) => t + clientEvent.text)
        }
        break
    }
  })

EventHandlerContext

The EventHandlerContext is a struct threaded through every handler. It decouples handlers from infrastructure by providing method handles rather than direct service references:
export interface EventHandlerContext {
  readonly sessionId: string
  readonly tenantId: string
  readonly cs: ConnectionState

  readonly publishEvent: (event: unknown) => Effect.Effect<void>
  readonly persistEvent: (seq: number, eventType: string, data: unknown) => void
  readonly transitionState: (newState: SessionState) => Effect.Effect<void>
  readonly publishTenantEvent: (event: unknown) => Effect.Effect<void>
  readonly writeMessage: (params: { id: string; turnId: string | null; role: "user" | "assistant"; content: string; createdAt: number }) => void
  readonly settleCredits: (params: { reservation: CreditReservation; actual: UsageRecord }) => Effect.Effect<void>
}
This design means handler modules have zero imports from infrastructure layers (Broadcaster, WorkerManager, BillingService). They interact exclusively through the context interface, making them trivially testable with stub implementations.

Ephemeral vs. Persistent Events

Not all events need to survive a restart. The gateway classifies events into two categories:
Broadcast to connected clients but never written to SQLite. These are high-frequency, transient signals that would bloat storage without providing replay value.
export const EPHEMERAL_EVENT_TYPES: ReadonlySet<EventType> = new Set([
  "connected",
  "heartbeat",
  "text_delta",
  "message.delta",
  "thinking.progress",
  "terminal.stream",
  "tool.call_delta",
  "pong",
  "welcome",
  "gap",
  "replay_complete",
  "usage.context",
  "stream_snapshot",
])
The distinction between ephemeral and persistent events is central to replay correctness. When a client reconnects and requests events after a given seq, it receives only persistent events. Text deltas are not replayed individually — instead, the turn_complete event carries the finalText field with the complete accumulated response.

The Event Envelope

Every event sent to clients conforms to the EventEnvelope structure defined in events.ts:
export interface EventEnvelope<T extends EventType = EventType> {
  readonly event_id: string
  readonly type: T
  readonly sequence_number: number
  readonly session_id: string | null
  readonly ts: number
  readonly trace_id: string | null
  readonly data: EventDataMap[T]
}
The EventDataMap is a mapped type that associates each of the 51 event types with its specific data interface, enabling fully typed event handling on both the server and in client SDKs.

The 51-Type Event Protocol

The gateway defines 51 event types spanning ten categories:

Connection Lifecycle

connected, heartbeat

Session Lifecycle

session_created, session_updated, session_archived, session_unarchived, session_deleted, session_state

Turn Lifecycle

turn_started, turn_complete, turn_error

Message Streaming

message.delta, message.complete, text_delta

Tool Lifecycle

tool.call_start, tool.call_delta, tool.call, tool.result, tool.error

Interactive

tool.question_requested, tool.permission_requested, tool.approval_resolved

Thinking & Terminal

thinking.start, thinking.progress, thinking.complete, terminal.stream, terminal.complete

Sandbox

sandbox.init, sandbox.provisioning, sandbox.ready, sandbox.removed

State & Reliability

state_snapshot, stream_snapshot, gap, replay_complete, steer_sent, stop_acknowledged

System & Auth

welcome, authenticated, session_list, pong, error, server_shutdown, usage.update, usage.context, file_list, file_content, file_changed, file_history_result, history, events

Broadcaster

The Broadcaster is an Effect service that wraps Bun’s native WebSocket pub/sub with topic lifecycle tracking. It provides two primary publishing methods:
readonly sessionEvent: (sessionId: string, event: unknown) => Effect.Effect<void>
readonly tenantEvent: (tenantId: string, event: unknown) => Effect.Effect<void>

Topic Naming Convention

ScopeTopic PatternSubscribers
Sessionsession:{sessionId}All clients that have joined this session
Tenanttenant:{tenantId}:sessionsAll authenticated clients for this tenant

Topic Tracking

The Broadcaster maintains a Set<string> of all topics that have received at least one publish. This enables two critical features:
  1. Graceful shutdown: broadcastShutdown(reason) iterates over all known topics and publishes a server_shutdown event to each, ensuring every connected client is notified before the server terminates.
  2. Topic cleanup: forgetTopic(topic) removes a topic from the tracking set when the last subscriber disconnects, preventing unbounded memory growth.

Heartbeat

A 30-second heartbeat timer publishes a heartbeat event to every active session topic. Clients use this to detect stale connections and trigger reconnection logic.
const heartbeatTimer = setInterval(() => {
  const heartbeat = JSON.stringify({ type: "heartbeat", ts: Date.now() })
  for (const sessionId of activeSessionIds) {
    server.publish(`session:${sessionId}`, heartbeat)
  }
}, HEARTBEAT_INTERVAL_MS)

State Snapshots for Late Joiners

When a client sends join_session, the gateway does not replay the entire event history. Instead, it constructs a state_snapshot — a point-in-time view of the session’s current state:
{
  type: "state_snapshot",
  sessionId: "...",
  session: { /* SessionMeta */ },
  currentTurn: {
    turnId: "...",
    textSoFar: "accumulated text from fullContent ref",
    startedAt: 1234567890
  } | null,
  recentHistory: [ /* last 50 messages */ ],
  subscriberCount: 3,
  sandbox: null
}
The textSoFar field is read from the active session’s ConnectionState.fullContent ref — the same ref that the text_delta handler accumulates into. This means a client joining mid-stream immediately sees all text generated so far, without needing to replay individual deltas.
The stream_snapshot event type exists for an even more detailed mid-stream view, including thinking content and pending tool calls. It is classified as ephemeral and not persisted.

Event Persistence Flow

The persistence path for a durable event follows this sequence:
1

Mapper assigns seq

mapPodiumEvent calls nextSeq(sessionId) to get a monotonic sequence number.
2

Broadcast to subscribers

The event is published to the session:{sessionId} topic via the Broadcaster.
3

Check persistence set

The event type is checked against PERSISTENT_EVENT_TYPES.
4

Write to SQLite

If persistent, persistEvent(sessionId, seq, eventType, data) fires a write command to the writer worker. The data is JSON-stringified. This is a fire-and-forget operation.
5

Handler dispatch

The event is dispatched to its handler module for side effects (state transitions, billing settlement, etc.).
Events are broadcast before they are persisted. This is an intentional trade-off: it minimizes latency for connected clients at the cost of a small window where a crash could lose the most recent persistent event. The writer worker’s batching (50ms / 100 commands) bounds this window.

Gap Detection and Replay

Clients can detect missing events by tracking the last received seq and comparing it to incoming events. If a gap is detected, the gateway supports two mechanisms:
  1. get_events message: Clients send { type: "get_events", sessionId, afterSeq, limit } to request events from the persistent store. The reader worker queries SELECT * FROM events WHERE seq > ? LIMIT ?.
  2. gap event: The server can proactively emit a gap event with fromSeq and toSeq fields, signaling to clients that events in that range may have been lost (e.g., due to a Podium connection drop).
The replay_complete event signals that the server has finished sending replayed events, and the client can resume processing real-time events.