Python SDK

The Diminuendo Python SDK provides an async WebSocket client for connecting to the gateway from Python applications. It uses the websockets library for WebSocket communication and Python’s asyncio for concurrency.

Installation

pip install diminuendo-sdk
Or install the WebSocket dependency directly and reference from source:
pip install websockets
Requires Python 3.11 or later. The SDK uses type union syntax (str | None), asyncio improvements, and other features introduced in Python 3.11.

DiminuendoClient

The DiminuendoClient class manages the WebSocket connection, authentication handshake, event dispatching, and request/response correlation.

Constructor

from diminuendo_sdk import DiminuendoClient, ClientOptions

client = DiminuendoClient(ClientOptions(
    url="ws://localhost:8080/ws",
    token="your-jwt-token",       # Optional in dev mode
    auto_reconnect=True,           # Default: True
    reconnect_delay=1.0,           # Default: 1.0 seconds
    ping_interval=30.0,            # Default: 30.0 seconds (0 to disable)
))
OptionTypeDefaultDescription
urlstr(required)Gateway WebSocket URL
tokenstr""JWT or API key for authentication
auto_reconnectboolTrueAutomatically reconnect on disconnect
reconnect_delayfloat1.0Seconds between reconnection attempts
ping_intervalfloat30.0Keepalive ping interval in seconds. 0 to disable

Connection Lifecycle

connect()

Establishes the WebSocket connection and waits for the authentication handshake. A background asyncio task is spawned to listen for incoming messages, dispatch events to registered handlers, and resolve pending request futures.
await client.connect()
print(client.authenticated)  # True
print(client.identity)       # {"userId": "...", "email": "...", "tenantId": "..."}
Authentication must complete within 10 seconds or a ConnectionError is raised.

disconnect()

Closes the WebSocket connection and cancels the background listener task:
await client.disconnect()

Properties

PropertyTypeDescription
connectedboolWhether the WebSocket is currently open
authenticatedboolWhether authentication has completed
identitydict[str, str] | NoneThe authenticated identity

Session Management

All session management methods are async and return typed dataclass instances:
# List sessions
sessions = await client.list_sessions()
all_sessions = await client.list_sessions(include_archived=True)

# Create a session
session = await client.create_session("coding-agent", "My Session")

# Rename, archive, unarchive, delete
renamed = await client.rename_session(session.id, "New Name")
archived = await client.archive_session(session.id)
restored = await client.unarchive_session(session.id)
await client.delete_session(session.id)

SessionMeta

@dataclass
class SessionMeta:
    id: str
    tenant_id: str
    name: str | None
    agent_type: str
    status: SessionStatus
    archived: bool
    created_at: int
    updated_at: int
    last_activity_at: int | None = None
Where SessionStatus is a Literal type covering the 7 valid session states:
SessionStatus = Literal[
    "inactive", "activating", "ready", "running",
    "waiting", "deactivating", "error",
]

Session Interaction

join_session / leave_session

Join a session to subscribe to its event stream. Returns a StateSnapshot containing the current session state:
snapshot = await client.join_session(session_id)
print(snapshot.session.status)       # "inactive" | "ready" | ...
print(snapshot.recent_history)       # List of recent HistoryItem
print(snapshot.subscriber_count)     # Number of connected viewers

# Resume from a known sequence number
snapshot = await client.join_session(session_id, after_seq=42)

# Leave (fire-and-forget)
await client.leave_session(session_id)

run_turn / stop_turn

Start an agent turn with a user message:
await client.run_turn(session_id, "Fix the bug in auth.py")
await client.run_turn(session_id, "Add tests", client_turn_id="turn-123")

# Cancel an in-progress turn
await client.stop_turn(session_id)

steer

Send mid-turn guidance:
await client.steer(session_id, "Focus on the error handling path")

answer_question

Respond to a question from the agent:
await client.answer_question(
    session_id,
    request_id,
    answers={"language": "Python", "framework": "FastAPI"},
)

# Dismiss without answering
await client.answer_question(session_id, request_id, answers={}, dismissed=True)

get_history / get_events

Retrieve paginated conversation history or raw events:
history = await client.get_history(session_id, after_seq=0, limit=50)
# history = [HistoryItem(id, role, content, created_at), ...]

events = await client.get_events(session_id, after_seq=0, limit=200)
# events = [{"type": "text_delta", "text": "...", ...}, ...]

ping

Measure round-trip latency:
latency_ms = await client.ping()
print(f"Latency: {latency_ms}ms")

File Access

Access files in the agent’s workspace:
# List files
files = await client.list_files(session_id)
nested = await client.list_files(session_id, path="src/", depth=2)

# Read a file
content = await client.read_file(session_id, "src/main.py")
print(content.content)     # File text
print(content.encoding)    # "utf-8"
print(content.size)        # Size in bytes

# Version history
history = await client.file_history(session_id, "src/main.py")
# [IterationMeta(iteration=1, timestamp=..., size=..., hash=...), ...]

# Read at a specific version
old = await client.file_at_iteration(session_id, "src/main.py", iteration=3)

File Types

@dataclass
class FileEntry:
    path: str
    name: str
    is_directory: bool
    size: int | None = None
    modified_at: int | None = None

@dataclass
class FileContent:
    session_id: str
    path: str
    content: str
    encoding: str
    size: int

@dataclass
class IterationMeta:
    iteration: int
    timestamp: int
    size: int
    hash: str | None = None
All dataclasses provide a from_dict classmethod that maps camelCase JSON fields to snake_case Python attributes.

Member Management

Manage tenant members:
result = await client.manage_members("list")
result = await client.manage_members("set_role", user_id="user-123", role="admin")
result = await client.manage_members("remove", user_id="user-123")

Event Handling

Register event handlers using the on() method. It returns an unsubscribe callable:
def on_text_delta(event: ServerEvent):
    print(event.text, end="", flush=True)

def on_turn_complete(event: ServerEvent):
    print(f"\nTurn complete: {event.final_text[:50]}...")

# Subscribe
unsub_delta = client.on("text_delta", on_text_delta)
unsub_complete = client.on("turn_complete", on_turn_complete)

# Wildcard handler — receives all events
client.on("*", lambda e: print(f"[{e.type}]"))

# Unsubscribe
unsub_delta()

ServerEvent

The ServerEvent dataclass provides a generic event representation with convenience properties for accessing common fields without manual dictionary traversal:
@dataclass
class ServerEvent:
    type: str
    data: dict[str, Any] = field(default_factory=dict)

    # Convenience properties
    session_id: str | None     # data["sessionId"]
    turn_id: str | None        # data["turnId"]
    text: str                  # data["text"] (empty string default)
    final_text: str            # data["finalText"]
    error_message: str         # data["message"]
    error_code: str            # data["code"]
    seq: int | None            # data["seq"]
    ts: int | None             # data["ts"]

    # Tool events
    tool_call_id: str | None
    tool_name: str | None
    tool_args: Any
    tool_output: str | None
    tool_status: str | None

    # Question events
    request_id: str | None
    questions: list[dict[str, Any]]

    # Usage
    input_tokens: int | None
    output_tokens: int | None
    model: str | None

Category Checks

ServerEvent provides boolean properties for checking event categories:
event.is_turn_event      # turn_started, text_delta, turn_complete, turn_error
event.is_tool_event      # tool_call, tool_result, tool_call_start, tool_call_delta, tool_error
event.is_thinking_event  # thinking_start, thinking_progress, thinking_complete
event.is_terminal_event  # terminal_stream, terminal_complete
event.is_sandbox_event   # sandbox_init, sandbox_provisioning, sandbox_ready, sandbox_removed
event.is_interactive_event  # question_requested, permission_requested, approval_resolved

StateSnapshot

Returned when joining a session:
@dataclass
class StateSnapshot:
    session_id: str
    session: SessionMeta
    current_turn: dict[str, Any] | None
    recent_history: list[HistoryItem]
    subscriber_count: int
    sandbox: dict[str, Any] | None

HistoryItem

@dataclass
class HistoryItem:
    id: str
    role: str
    content: str
    created_at: int
    metadata: Any = None

Complete Example

import asyncio
from diminuendo_sdk import DiminuendoClient, ClientOptions, ServerEvent


async def main():
    client = DiminuendoClient(ClientOptions(
        url="ws://localhost:8080/ws",
        token="",  # Empty for dev mode
    ))

    # Connect and authenticate
    await client.connect()
    print(f"Authenticated: {client.authenticated}")

    # Create a session
    session = await client.create_session("coding-agent", "Bug Fix Session")
    print(f"Created session: {session.id}")

    # Join and inspect state
    snapshot = await client.join_session(session.id)
    print(f"Status: {snapshot.session.status}")
    print(f"Subscribers: {snapshot.subscriber_count}")

    # Register event handlers
    turn_complete = asyncio.Event()

    def on_text(event: ServerEvent):
        print(event.text, end="", flush=True)

    def on_tool_call(event: ServerEvent):
        print(f"\n[tool] {event.tool_name}({event.tool_args})")

    def on_tool_result(event: ServerEvent):
        print(f"[result] {event.tool_status}: {event.tool_output}")

    def on_complete(event: ServerEvent):
        print(f"\n--- Turn complete ---")
        turn_complete.set()

    def on_error(event: ServerEvent):
        print(f"\n[error] {event.error_message}")
        turn_complete.set()

    client.on("text_delta", on_text)
    client.on("tool_call", on_tool_call)
    client.on("tool_result", on_tool_result)
    client.on("turn_complete", on_complete)
    client.on("turn_error", on_error)

    # Start a turn
    await client.run_turn(session.id, "Fix the authentication bug in auth.py")

    # Wait for the turn to finish
    await turn_complete.wait()

    # Clean up
    await client.leave_session(session.id)
    await client.disconnect()


if __name__ == "__main__":
    asyncio.run(main())
For long-running applications, register a disconnected event handler to detect connection loss and implement custom reconnection logic beyond the built-in auto_reconnect behavior.