Effect TS Patterns

Diminuendo is built entirely on Effect, a TypeScript library that replaces ad-hoc error handling, manual dependency wiring, and unstructured concurrency with a principled, type-safe programming model. This page explains the specific Effect patterns used in the gateway, why each one matters, and how they appear in the actual source code.

Why Effect TS

Traditional TypeScript applications suffer from four pervasive problems:
  1. Invisible errors — functions throw exceptions that are not reflected in their type signatures. Callers must remember to try/catch, and TypeScript cannot verify that they do.
  2. Hidden dependencies — modules import singletons (database connections, configuration, loggers) directly, making them impossible to test in isolation or replace at runtime.
  3. Unstructured concurrencyPromise.all provides no way to cancel in-flight operations when one fails, and orphaned promises leak memory and cause unexpected side effects.
  4. Manual resource management — database connections, file handles, and timers must be manually cleaned up in finally blocks, which are easy to forget and hard to compose.
Effect solves all four problems in a single, composable abstraction: Effect<Success, Error, Requirements>. The three type parameters make success, failure, and dependency requirements explicit at the type level.

Layers as Dependency Injection

Every service in Diminuendo is defined as a Context.Tag — a typed key that identifies a service interface — with a corresponding Live implementation wrapped in a Layer.

Defining a Service

The AuthService is a typical example. The tag declares the interface; the live implementation is a Layer that requires AppConfig:
// src/auth/AuthService.ts
import { Context, Effect, Layer } from "effect"
import { AppConfig } from "../config/AppConfig.js"
import { Unauthenticated } from "../errors.js"

export interface AuthIdentity {
  readonly userId: string
  readonly email: string
  readonly tenantId: string
}

export class AuthService extends Context.Tag("AuthService")<
  AuthService,
  { readonly authenticate: (token: string | null) => Effect.Effect<AuthIdentity, Unauthenticated> }
>() {}

export const AuthServiceLive = Layer.effect(
  AuthService,
  Effect.gen(function* () {
    const config = yield* AppConfig

    if (config.devMode) {
      return {
        authenticate: (_token) => Effect.succeed({
          userId: "dev-user-001",
          email: "developer@example.com",
          tenantId: "dev",
        }),
      }
    }

    // Production: JWT verification via JWKS endpoint
    // ...
  })
)
The key insight is that AuthServiceLive does not import AppConfig as a module — it requires it as a Layer dependency. The Effect runtime resolves this dependency at startup via the Layer graph.

Composing the Layer Graph

In src/main.ts, all layers are composed into a single dependency tree:
// src/main.ts
const RegistryLayer = SessionRegistryServiceLive.pipe(Layer.provide(AppConfigLive))
const PodiumLayer = PodiumClientLive.pipe(Layer.provide(AppConfigLive))
const AuthLayer = AuthServiceLive.pipe(Layer.provide(AppConfigLive))
const BroadcastLayer = BroadcasterLive
const WorkerLayer = WorkerManagerLive.pipe(Layer.provide(AppConfigLive))
const BillingLayer = BillingServiceUnlimitedLive.pipe(
  Layer.provide(Layer.mergeAll(AppConfigLive, WorkerLayer))
)
const MembershipLayer = MembershipServiceLive.pipe(Layer.provide(AppConfigLive))

const RouterDeps = Layer.mergeAll(
  RegistryLayer, PodiumLayer, AppConfigLive,
  BroadcastLayer, BillingLayer, WorkerLayer, MembershipLayer,
)
const RouterLayer = MessageRouterLive.pipe(Layer.provide(RouterDeps))

const AppLayer = Layer.mergeAll(
  AppConfigLive, AuthLayer, RegistryLayer, PodiumLayer,
  BroadcastLayer, BillingLayer, EnsembleLayer,
  MembershipLayer, WorkerLayer, RouterLayer,
)
Layer.mergeAll combines independent layers in parallel. Layer.provide chains dependent layers in sequence. The Effect runtime resolves the full dependency graph, constructs each service exactly once, and provides it to the program.
This composition makes the dependency structure explicit and visible in one place. There is no service locator, no global registry, no runtime reflection. If a layer requires a dependency that is not provided, it is a compile-time type error.

Typed Errors

Diminuendo uses Data.TaggedError to define a closed set of domain errors. Each error is a branded class with a _tag discriminant:
// src/errors.ts
import { Data } from "effect"

export class Unauthenticated extends Data.TaggedError("Unauthenticated")<{
  readonly reason: string
}> {}

export class SessionNotFound extends Data.TaggedError("SessionNotFound")<{
  readonly sessionId: string
}> {}

export class PodiumConnectionError extends Data.TaggedError("PodiumConnectionError")<{
  readonly message: string
  readonly cause?: unknown
}> {}

export class DbError extends Data.TaggedError("DbError")<{
  readonly message: string
  readonly cause?: unknown
}> {}
These errors appear in the type signatures of the functions that can produce them. For example, PodiumClient.connect returns Effect.Effect<PodiumConnection, PodiumConnectionError> — the caller knows exactly what can go wrong and must handle it explicitly.

Error Propagation Without Try/Catch

In the MessageRouter, errors propagate through Effect.gen without any try/catch blocks:
// src/session/MessageRouterLive.ts (simplified)
case "run_turn": {
  // Each of these can fail with a typed error.
  // If any fails, the error propagates to the outer catchAll.
  const canProceed = yield* billing.canProceed(identity.tenantId)
  const session = yield* registry.get(identity.tenantId, message.sessionId)
  const active = yield* ensurePodiumConnection(
    message.sessionId, identity, session.agentType
  )
  yield* active.connection.sendMessage(message.text, { ... })
  return { kind: "broadcast" as const }
}
The outer catchAll at the bottom of the route function maps every typed error to a safe client-facing error response:
.pipe(
  Effect.catchAll((err) =>
    Effect.gen(function* () {
      yield* Effect.logError(`Route error: ${err}`)
      const code = (err as { _tag?: string })._tag ?? "INTERNAL_ERROR"
      const safeMessages: Record<string, string> = {
        Unauthenticated: "Authentication required",
        SessionNotFound: "Session not found",
        PodiumConnectionError: "Failed to connect to agent",
        DbError: "Database operation failed",
      }
      return {
        kind: "respond" as const,
        data: {
          type: "error",
          code,
          message: safeMessages[code] ?? sanitizeErrorMessage(err),
        },
      }
    })
  )
)
The sanitizeErrorMessage function strips internal details from error messages before they reach clients. This prevents information leakage while preserving useful error codes for debugging.

Streams for Event Processing

The Podium agent produces a stream of events (thinking progress, tool calls, text deltas, terminal output) over a WebSocket connection. Diminuendo models this as an Effect.Stream and consumes it in a forked daemon fiber.

Creating the Event Stream

PodiumClient.connect() creates an unbounded Queue, feeds incoming WebSocket messages into it, and exposes it as a Stream:
// src/upstream/PodiumClient.ts (simplified)
const eventQueue = yield* Queue.unbounded<PodiumEvent>()

ws.addEventListener("message", (ev) => {
  const json = JSON.parse(ev.data)
  const event: PodiumEvent = {
    messageType: json.type ?? json.message_type ?? "",
    content: json.content,
    agentId: json.agent_id,
  }
  Effect.runSync(Queue.offer(eventQueue, event))
})

ws.addEventListener("close", () => {
  Effect.runSync(Queue.shutdown(eventQueue))
})

const connection: PodiumConnection = {
  events: Stream.fromQueue(eventQueue),
  // ...
}

Consuming the Stream

MessageRouterLive consumes the stream with Stream.runForEach, forked as a daemon fiber so it outlives the originating request:
// src/session/MessageRouterLive.ts
const startEventStreamFiber = (
  sessionId: string,
  tenantId: string,
  connection: PodiumConnection,
  cs: ConnectionState,
) => {
  const ctx = makeHandlerContext(sessionId, tenantId, cs)

  return connection.events.pipe(
    Stream.runForEach((event) =>
      Effect.gen(function* () {
        const turnId = (yield* Ref.get(cs.turnId)) ?? "unknown"
        const clientEvents = mapPodiumEvent(sessionId, turnId, event)

        for (const clientEvent of clientEvents) {
          yield* broadcaster.sessionEvent(sessionId, clientEvent)
          yield* dispatchToHandler(ctx, clientEvent, turnId, event)
        }
      })
    ),
    Effect.tap(() =>
      Effect.gen(function* () {
        yield* Ref.update(activeSessionsRef, HashMap.remove(sessionId))
        yield* transitionSessionState(tenantId, sessionId, cs, "inactive")
      })
    ),
    Effect.catchAll((err) =>
      Effect.gen(function* () {
        yield* Effect.logError(`Event stream error: ${err}`)
        yield* broadcaster.sessionEvent(sessionId, {
          type: "turn_error",
          message: "Lost connection to agent",
          code: "AGENT_DISCONNECTED",
          // ...
        })
        yield* transitionSessionState(tenantId, sessionId, cs, "error")
      })
    ),
  )
}

// Forked as a daemon fiber — runs independently of the request fiber
const eventFiber = yield* Effect.forkDaemon(
  startEventStreamFiber(sessionId, identity.tenantId, connection, cs)
)
The daemon fiber runs until the stream completes (agent disconnects), errors out (network failure), or is explicitly interrupted (session deletion). In the deletion case, the router interrupts the fiber directly:
case "delete_session": {
  // ...
  const active = HashMap.get(sessions, message.sessionId)
  if (active._tag === "Some") {
    yield* Fiber.interrupt(active.value.eventFiber)
    yield* active.value.connection.close
  }
  // ...
}

Refs for Mutable State

Effect’s Ref provides thread-safe mutable state with atomic read-modify-write operations. Diminuendo uses Refs extensively for per-connection state that changes during a turn.

ConnectionState

Each active session maintains a ConnectionState — a struct of Refs tracking the current turn, accumulated text, pending tool calls, thinking state, billing reservation, and more:
// src/session/ConnectionState.ts
export interface ConnectionState {
  readonly turnId: Ref.Ref<string | null>
  readonly fullContent: Ref.Ref<string>
  readonly stopRequested: Ref.Ref<boolean>
  readonly turnStopped: Ref.Ref<boolean>

  readonly pendingToolCalls: Ref.Ref<Map<string, { toolName: string; startedAt: number }>>
  readonly completedToolIds: Ref.Ref<Set<string>>

  readonly isThinking: Ref.Ref<boolean>
  readonly thinkingContent: Ref.Ref<string>

  readonly currentReservation: Ref.Ref<CreditReservation | null>
  readonly sessionState: Ref.Ref<SessionState>
  // ... 15 refs total
}
The factory function creates all Refs with their initial values:
export const makeConnectionState = (): Effect.Effect<ConnectionState> =>
  Effect.gen(function* () {
    return {
      turnId: yield* Ref.make<string | null>(null),
      fullContent: yield* Ref.make(""),
      stopRequested: yield* Ref.make(false),
      pendingToolCalls: yield* Ref.make(new Map()),
      sessionState: yield* Ref.make<SessionState>("inactive"),
      // ...
    }
  })

Atomic State Updates

Ref.update guarantees atomic read-modify-write. This is essential when multiple events arrive concurrently on the stream fiber. For example, accumulating text content:
// Inside the event handler for text_delta
yield* Ref.update(ctx.cs.fullContent, (text) => text + clientEvent.text)
And the active sessions map uses HashMap (an immutable persistent data structure) inside a Ref for concurrent-safe updates:
const activeSessionsRef = yield* Ref.make(HashMap.empty<string, ActiveSession>())

// Adding a session
yield* Ref.update(activeSessionsRef, HashMap.set(sessionId, activeSession))

// Removing a session
yield* Ref.update(activeSessionsRef, HashMap.remove(sessionId))

// Looking up a session
const sessions = yield* Ref.get(activeSessionsRef)
const existing = HashMap.get(sessions, sessionId)

Effect.gen for Sequential Composition

Effect’s generator syntax (Effect.gen(function* () { ... })) provides readable sequential composition without callback nesting or promise chains. Each yield* suspends execution until the effect completes, and failures short-circuit through the generator. A representative example from the run_turn handler:
case "run_turn": {
  // 1. Check billing
  const canProceed = yield* billing.canProceed(identity.tenantId)
  if (!canProceed) {
    return { kind: "respond", data: { type: "error", code: "INSUFFICIENT_CREDITS", ... } }
  }

  // 2. Reserve credits
  const reservation = yield* billing.reserveCredits({
    tenantId: identity.tenantId,
    estimatedCostMicroDollars: 50_000,
  })

  // 3. Fetch session metadata
  const session = yield* registry.get(identity.tenantId, message.sessionId)

  // 4. Update session status
  yield* registry.updateStatus(identity.tenantId, message.sessionId, "running")

  // 5. Persist user message
  workers.write({ type: "insert_message", sessionId: message.sessionId, ... })

  // 6. Connect to agent
  const active = yield* ensurePodiumConnection(
    message.sessionId, identity, session.agentType
  )

  // 7. Reset turn state and send message
  yield* resetTurnState(active.state)
  yield* Ref.set(active.state.turnId, turnId)
  yield* Ref.set(active.state.currentReservation, reservation)
  yield* active.connection.sendMessage(message.text, { ... })

  return { kind: "broadcast" }
}
Each step reads linearly, and if any yield* fails (session not found, Podium connection error, database error), execution jumps to the outer catchAll handler. No intermediate try/catch is needed.

Schedule for Retry and Backoff

Effect’s Schedule combinator provides composable retry policies. Diminuendo defines two policies for its upstream service connections:
// src/resilience/RetryPolicy.ts
import { Schedule } from "effect"

/** Podium retry: 500ms base exponential with jitter, 3 retries max */
export const podiumRetry = Schedule.exponential("500 millis").pipe(
  Schedule.jittered,
  Schedule.compose(Schedule.recurs(3)),
)

/** Ensemble retry: 1s base exponential with jitter, 2 retries max */
export const ensembleRetry = Schedule.exponential("1 seconds").pipe(
  Schedule.jittered,
  Schedule.compose(Schedule.recurs(2)),
)
Without jitter, all retrying clients would hit the backend simultaneously after each exponential delay, creating a thundering herd effect. Schedule.jittered adds random variance to each delay, spreading retries over time and reducing peak load on the recovering service. This is especially important for Podium connections, where multiple sessions may disconnect simultaneously during a backend restart.
These schedules compose with any effectful operation via Effect.retry:
yield* podium.createInstance(params).pipe(
  Effect.retry(podiumRetry),
)

Circuit Breaker

For persistent upstream failures, the gateway includes a circuit breaker that prevents cascading failures by fast-failing requests when a backend is known to be down:
// src/resilience/CircuitBreaker.ts (simplified)
export const makeCircuitBreaker = (config?) =>
  Effect.gen(function* () {
    const ref = yield* Ref.make<CircuitInternalState>({
      state: "closed",
      failures: 0,
      lastFailureAt: 0,
      probesInFlight: 0,
    })

    const execute = <A, E>(effect: Effect.Effect<A, E>) =>
      Effect.gen(function* () {
        const current = yield* Ref.get(ref)

        if (current.state === "open") {
          const elapsed = Date.now() - current.lastFailureAt
          if (elapsed >= cfg.cooldownMs) {
            // Transition to half-open: allow one probe request
            yield* Ref.update(ref, (s) => ({ ...s, state: "half-open" }))
          } else {
            return yield* Effect.fail(new CircuitBreakerOpen(cfg.cooldownMs - elapsed))
          }
        }

        return yield* Effect.matchEffect(effect, {
          onSuccess: (a) => Effect.as(recordSuccess, a),
          onFailure: (e) => Effect.zipRight(recordFailure, Effect.fail(e)),
        })
      })

    return { state, execute, recordSuccess, recordFailure }
  })
The circuit breaker uses a Ref for its internal state (closed/open/half-open, failure count, cooldown timer), ensuring atomic state transitions even when multiple fibers attempt concurrent requests.

Redacted for Secrets

Configuration values that contain secrets are typed as Redacted<string> — an opaque wrapper that prevents accidental logging or serialization:
// src/config/AppConfig.ts
import { Config, Redacted } from "effect"

export interface AppConfigShape {
  readonly podiumApiKey: Redacted.Redacted<string>
  readonly ensembleApiKey: Redacted.Redacted<string>
  readonly authClientSecret: Redacted.Redacted<string>
  readonly e2bApiKey: Redacted.Redacted<string>
  // ...
}

// Reading the config
const config: AppConfigShape = {
  podiumApiKey: yield* Config.redacted("PODIUM_API_KEY").pipe(
    Config.withDefault(Redacted.make(""))
  ),
  // ...
}
When you need the actual value (e.g., to construct an HTTP header), you must explicitly unwrap it with Redacted.value():
// src/upstream/PodiumClient.ts
const apiKey = Redacted.value(config.podiumApiKey)
const authHeaders = apiKey
  ? { Authorization: `Bearer ${apiKey}` }
  : {}
If you accidentally pass a Redacted value to console.log, JSON.stringify, or Effect’s structured logger, it will render as <redacted> rather than the plaintext secret. This prevents credentials from appearing in logs, error reports, or debug output.

Summary

PatternWhat It SolvesWhere It Appears
Context.Tag + LayerDependency injection without singletonsEvery service definition (AuthService, PodiumClient, Broadcaster, etc.)
Data.TaggedErrorTyped errors in the type signaturesrc/errors.ts — 11 error types
Stream + QueueBackpressure-aware event streamingPodiumClient.connect() event stream
Effect.forkDaemonBackground fibers that outlive their parentEvent stream consumption in MessageRouterLive
Ref + HashMapThread-safe mutable stateConnectionState, activeSessionsRef, Broadcaster
Effect.genReadable sequential compositionEvery handler in the router
ScheduleComposable retry policiesRetryPolicy.ts — exponential + jitter
RedactedPreventing accidental secret loggingAppConfig — API keys, client secrets
These patterns are not incidental — they form the structural backbone of the gateway. Together, they ensure that Diminuendo’s business logic is type-safe, composable, testable, and resilient to the failure modes inherent in distributed systems.