Python SDK
The Diminuendo Python SDK provides an async WebSocket client for connecting to the gateway from Python applications. It uses the websockets library for WebSocket communication and Python’s asyncio for concurrency.
Installation
pip install diminuendo-sdk
Or install the WebSocket dependency directly and reference from source:
Requires Python 3.11 or later. The SDK uses type union syntax (str | None), asyncio improvements, and other features introduced in Python 3.11.
DiminuendoClient
The DiminuendoClient class manages the WebSocket connection, authentication handshake, event dispatching, and request/response correlation.
Constructor
from diminuendo_sdk import DiminuendoClient, ClientOptions
client = DiminuendoClient(ClientOptions(
url="ws://localhost:8080/ws",
token="your-jwt-token", # Optional in dev mode
auto_reconnect=True, # Default: True
reconnect_delay=1.0, # Default: 1.0 seconds
ping_interval=30.0, # Default: 30.0 seconds (0 to disable)
))
| Option | Type | Default | Description |
|---|
url | str | (required) | Gateway WebSocket URL |
token | str | "" | JWT or API key for authentication |
auto_reconnect | bool | True | Automatically reconnect on disconnect |
reconnect_delay | float | 1.0 | Seconds between reconnection attempts |
ping_interval | float | 30.0 | Keepalive ping interval in seconds. 0 to disable |
Connection Lifecycle
connect()
Establishes the WebSocket connection and waits for the authentication handshake. A background asyncio task is spawned to listen for incoming messages, dispatch events to registered handlers, and resolve pending request futures.
await client.connect()
print(client.authenticated) # True
print(client.identity) # {"userId": "...", "email": "...", "tenantId": "..."}
Authentication must complete within 10 seconds or a ConnectionError is raised.
disconnect()
Closes the WebSocket connection and cancels the background listener task:
await client.disconnect()
Properties
| Property | Type | Description |
|---|
connected | bool | Whether the WebSocket is currently open |
authenticated | bool | Whether authentication has completed |
identity | dict[str, str] | None | The authenticated identity |
Session Management
All session management methods are async and return typed dataclass instances:
# List sessions
sessions = await client.list_sessions()
all_sessions = await client.list_sessions(include_archived=True)
# Create a session
session = await client.create_session("coding-agent", "My Session")
# Rename, archive, unarchive, delete
renamed = await client.rename_session(session.id, "New Name")
archived = await client.archive_session(session.id)
restored = await client.unarchive_session(session.id)
await client.delete_session(session.id)
@dataclass
class SessionMeta:
id: str
tenant_id: str
name: str | None
agent_type: str
status: SessionStatus
archived: bool
created_at: int
updated_at: int
last_activity_at: int | None = None
Where SessionStatus is a Literal type covering the 7 valid session states:
SessionStatus = Literal[
"inactive", "activating", "ready", "running",
"waiting", "deactivating", "error",
]
Session Interaction
join_session / leave_session
Join a session to subscribe to its event stream. Returns a StateSnapshot containing the current session state:
snapshot = await client.join_session(session_id)
print(snapshot.session.status) # "inactive" | "ready" | ...
print(snapshot.recent_history) # List of recent HistoryItem
print(snapshot.subscriber_count) # Number of connected viewers
# Resume from a known sequence number
snapshot = await client.join_session(session_id, after_seq=42)
# Leave (fire-and-forget)
await client.leave_session(session_id)
run_turn / stop_turn
Start an agent turn with a user message:
await client.run_turn(session_id, "Fix the bug in auth.py")
await client.run_turn(session_id, "Add tests", client_turn_id="turn-123")
# Cancel an in-progress turn
await client.stop_turn(session_id)
steer
Send mid-turn guidance:
await client.steer(session_id, "Focus on the error handling path")
answer_question
Respond to a question from the agent:
await client.answer_question(
session_id,
request_id,
answers={"language": "Python", "framework": "FastAPI"},
)
# Dismiss without answering
await client.answer_question(session_id, request_id, answers={}, dismissed=True)
get_history / get_events
Retrieve paginated conversation history or raw events:
history = await client.get_history(session_id, after_seq=0, limit=50)
# history = [HistoryItem(id, role, content, created_at), ...]
events = await client.get_events(session_id, after_seq=0, limit=200)
# events = [{"type": "text_delta", "text": "...", ...}, ...]
ping
Measure round-trip latency:
latency_ms = await client.ping()
print(f"Latency: {latency_ms}ms")
File Access
Access files in the agent’s workspace:
# List files
files = await client.list_files(session_id)
nested = await client.list_files(session_id, path="src/", depth=2)
# Read a file
content = await client.read_file(session_id, "src/main.py")
print(content.content) # File text
print(content.encoding) # "utf-8"
print(content.size) # Size in bytes
# Version history
history = await client.file_history(session_id, "src/main.py")
# [IterationMeta(iteration=1, timestamp=..., size=..., hash=...), ...]
# Read at a specific version
old = await client.file_at_iteration(session_id, "src/main.py", iteration=3)
File Types
@dataclass
class FileEntry:
path: str
name: str
is_directory: bool
size: int | None = None
modified_at: int | None = None
@dataclass
class FileContent:
session_id: str
path: str
content: str
encoding: str
size: int
@dataclass
class IterationMeta:
iteration: int
timestamp: int
size: int
hash: str | None = None
All dataclasses provide a from_dict classmethod that maps camelCase JSON fields to snake_case Python attributes.
Member Management
Manage tenant members:
result = await client.manage_members("list")
result = await client.manage_members("set_role", user_id="user-123", role="admin")
result = await client.manage_members("remove", user_id="user-123")
Event Handling
Register event handlers using the on() method. It returns an unsubscribe callable:
def on_text_delta(event: ServerEvent):
print(event.text, end="", flush=True)
def on_turn_complete(event: ServerEvent):
print(f"\nTurn complete: {event.final_text[:50]}...")
# Subscribe
unsub_delta = client.on("text_delta", on_text_delta)
unsub_complete = client.on("turn_complete", on_turn_complete)
# Wildcard handler — receives all events
client.on("*", lambda e: print(f"[{e.type}]"))
# Unsubscribe
unsub_delta()
ServerEvent
The ServerEvent dataclass provides a generic event representation with convenience properties for accessing common fields without manual dictionary traversal:
@dataclass
class ServerEvent:
type: str
data: dict[str, Any] = field(default_factory=dict)
# Convenience properties
session_id: str | None # data["sessionId"]
turn_id: str | None # data["turnId"]
text: str # data["text"] (empty string default)
final_text: str # data["finalText"]
error_message: str # data["message"]
error_code: str # data["code"]
seq: int | None # data["seq"]
ts: int | None # data["ts"]
# Tool events
tool_call_id: str | None
tool_name: str | None
tool_args: Any
tool_output: str | None
tool_status: str | None
# Question events
request_id: str | None
questions: list[dict[str, Any]]
# Usage
input_tokens: int | None
output_tokens: int | None
model: str | None
Category Checks
ServerEvent provides boolean properties for checking event categories:
event.is_turn_event # turn_started, text_delta, turn_complete, turn_error
event.is_tool_event # tool_call, tool_result, tool_call_start, tool_call_delta, tool_error
event.is_thinking_event # thinking_start, thinking_progress, thinking_complete
event.is_terminal_event # terminal_stream, terminal_complete
event.is_sandbox_event # sandbox_init, sandbox_provisioning, sandbox_ready, sandbox_removed
event.is_interactive_event # question_requested, permission_requested, approval_resolved
StateSnapshot
Returned when joining a session:
@dataclass
class StateSnapshot:
session_id: str
session: SessionMeta
current_turn: dict[str, Any] | None
recent_history: list[HistoryItem]
subscriber_count: int
sandbox: dict[str, Any] | None
HistoryItem
@dataclass
class HistoryItem:
id: str
role: str
content: str
created_at: int
metadata: Any = None
Complete Example
import asyncio
from diminuendo_sdk import DiminuendoClient, ClientOptions, ServerEvent
async def main():
client = DiminuendoClient(ClientOptions(
url="ws://localhost:8080/ws",
token="", # Empty for dev mode
))
# Connect and authenticate
await client.connect()
print(f"Authenticated: {client.authenticated}")
# Create a session
session = await client.create_session("coding-agent", "Bug Fix Session")
print(f"Created session: {session.id}")
# Join and inspect state
snapshot = await client.join_session(session.id)
print(f"Status: {snapshot.session.status}")
print(f"Subscribers: {snapshot.subscriber_count}")
# Register event handlers
turn_complete = asyncio.Event()
def on_text(event: ServerEvent):
print(event.text, end="", flush=True)
def on_tool_call(event: ServerEvent):
print(f"\n[tool] {event.tool_name}({event.tool_args})")
def on_tool_result(event: ServerEvent):
print(f"[result] {event.tool_status}: {event.tool_output}")
def on_complete(event: ServerEvent):
print(f"\n--- Turn complete ---")
turn_complete.set()
def on_error(event: ServerEvent):
print(f"\n[error] {event.error_message}")
turn_complete.set()
client.on("text_delta", on_text)
client.on("tool_call", on_tool_call)
client.on("tool_result", on_tool_result)
client.on("turn_complete", on_complete)
client.on("turn_error", on_error)
# Start a turn
await client.run_turn(session.id, "Fix the authentication bug in auth.py")
# Wait for the turn to finish
await turn_complete.wait()
# Clean up
await client.leave_session(session.id)
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
For long-running applications, register a disconnected event handler to detect connection loss and implement custom reconnection logic beyond the built-in auto_reconnect behavior.