Documentation Index
Fetch the complete documentation index at: https://iii.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Installation
Initialization
Create an III client and connect to the engine.
Blocks until the WebSocket connection is established and ready.
from iii import register_worker, InitOptions
iii = register_worker('ws://localhost:49134', InitOptions(worker_name='my-worker'))
Methods
connect_async
Connect to the III Engine via WebSocket.
Initializes OpenTelemetry (if configured), attaches the event loop,
and establishes the WebSocket connection. This is called automatically
during construction — use it only if you need to reconnect manually
from an async context.
Signature
create_channel
Create a streaming channel pair for worker-to-worker data transfer.
The returned Channel contains a local writer / reader
and their serializable refs (writer_ref, reader_ref) that
can be passed as fields in invocation data to other functions.
Signature
create_channel(buffer_size: int | None = None)
Parameters
| Name | Type | Required | Description |
|---|
buffer_size | int | None | No | Buffer capacity for the channel. Defaults to 64. |
Example
ch = iii.create_channel()
fn = iii.register_function("producer", producer_handler)
iii.trigger({"function_id": "producer", "payload": {"output": ch.writer_ref}})
create_channel_async
Create a streaming channel pair for worker-to-worker data transfer.
The returned Channel contains a local writer / reader
and their serializable refs (writer_ref, reader_ref) that
can be passed as fields in invocation data to other functions.
Signature
async (buffer_size: int | None = None)
Parameters
| Name | Type | Required | Description |
|---|
buffer_size | int | None | No | Buffer capacity for the channel. Defaults to 64. |
Example
ch = await iii.create_channel_async()
fn = iii.register_function("producer", producer_handler)
await iii.trigger_async({"function_id": "producer", "payload": {"output": ch.writer_ref}})
create_stream
Register a custom stream implementation, overriding the engine default.
Registers 5 of the 6 IStream methods (get, set, delete,
list, list_groups). The update method is not registered
— atomic updates are handled by the engine’s built-in stream update logic.
Signature
create_stream(stream_name: str, stream: IStream[Any])
Parameters
| Name | Type | Required | Description |
|---|
stream_name | str | Yes | Unique name for the stream. |
stream | IStream[Any] | Yes | An object implementing the IStream interface. |
Example
from iii.stream import IStream
class MyStream(IStream):
async def get(self, input): ...
async def set(self, input): ...
async def delete(self, input): ...
async def list(self, input): ...
async def list_groups(self, input): ...
async def update(self, input): ...
iii.create_stream("my-stream", MyStream())
get_connection_state
Return the current WebSocket connection state.
Signature
register_function
Register a function with the engine.
Pass a handler for local execution, or an HttpInvocationConfig
for HTTP-invoked functions (Lambda, Cloudflare Workers, etc.).
Handlers can be synchronous or asynchronous. Sync handlers are
automatically wrapped with run_in_executor so they do not
block the event loop. Each handler receives a single data
argument containing the trigger payload.
request_format and response_format are auto-extracted
from the handler’s type hints when omitted or passed as None
(the default). To opt out of auto-extraction, pass an explicit
schema (RegisterFunctionFormat or dict). This behavior
is Python-specific — the Node SDK does not auto-extract from TS
types, because TypeScript types are erased at runtime.
Signature
register_function(function_id: str, handler_or_invocation: RemoteFunctionHandler | HttpInvocationConfig, description: str | None = None, metadata: dict[str, Any] | None = None, request_format: RegisterFunctionFormat | dict[str, Any] | None = None, response_format: RegisterFunctionFormat | dict[str, Any] | None = None)
Parameters
| Name | Type | Required | Description |
|---|
function_id | str | Yes | Unique string identifier for the function. |
handler_or_invocation | RemoteFunctionHandler | HttpInvocationConfig | Yes | A callable handler or HttpInvocationConfig. Callable handlers receive one positional argument (data — the trigger payload) and may return a value. |
description | str | None | No | Human-readable description. |
metadata | dict[str, Any] | None | No | Arbitrary metadata. |
request_format | RegisterFunctionFormat | dict[str, Any] | None | No | Schema describing expected input. When None (default), auto-extracted from the handler’s first-parameter type hint. Pass an explicit schema to override; there is no way to register with no schema when the handler is typed. |
response_format | RegisterFunctionFormat | dict[str, Any] | None | No | Schema describing expected output. Same auto-extraction semantics as request_format. |
Example
def greet(data):
return {'message': f"Hello, {data['name']}!"}
fn = iii.register_function("greet", greet, description="Greets a user")
fn.unregister()
from pydantic import BaseModel
class GreetInput(BaseModel):
name: str
class GreetOutput(BaseModel):
message: str
async def greet(data: GreetInput) -> GreetOutput:
return GreetOutput(message=f"Hello, {data.name}!")
fn = iii.register_function("greet", greet, description="Greets a user")
register_service
Register a logical service grouping with the engine.
Services provide an organisational hierarchy for functions. A
service can optionally reference a parent_service_id to form
a tree visible in the console.
Signature
register_service(service: RegisterServiceInput | dict[str, Any])
Parameters
| Name | Type | Required | Description |
|---|
service | RegisterServiceInput | dict[str, Any] | Yes | A RegisterServiceInput or dict with id and optional name, description, parent_service_id. |
Example
iii.register_service({"id": "payments", "description": "Payment processing"})
iii.register_service({
"id": "payments::refunds",
"description": "Refund sub-service",
"parent_service_id": "payments",
})
register_trigger
Bind a trigger configuration to a registered function.
Signature
register_trigger(trigger: RegisterTriggerInput | dict[str, Any])
Parameters
| Name | Type | Required | Description |
|---|
trigger | RegisterTriggerInput | dict[str, Any] | Yes | A RegisterTriggerInput or dict with type, function_id, and optional config. |
Example
trigger = iii.register_trigger({
'type': 'http',
'function_id': 'greet',
'config': {'api_path': '/greet', 'http_method': 'GET'}
})
trigger = iii.register_trigger(RegisterTriggerInput(
type="http", function_id="greet",
config={'api_path': '/greet', 'http_method': 'GET'}
))
trigger.unregister()
register_trigger_type
Register a custom trigger type with the engine.
Returns a :class:TriggerTypeRef handle with register_trigger
and register_function methods.
Signature
register_trigger_type(trigger_type: RegisterTriggerTypeInput | dict[str, Any], handler: TriggerHandler[Any])
Parameters
| Name | Type | Required | Description |
|---|
trigger_type | RegisterTriggerTypeInput | dict[str, Any] | Yes | A RegisterTriggerTypeInput or dict with id, description, and optional trigger_request_format / call_request_format (Pydantic class or dict). |
handler | TriggerHandler[Any] | Yes | A TriggerHandler instance. |
Example
webhook = iii.register_trigger_type(
RegisterTriggerTypeInput(
id="webhook",
description="Webhook trigger",
trigger_request_format=WebhookConfig,
call_request_format=WebhookCallRequest,
),
WebhookHandler(),
)
webhook.register_function("handler", handle_webhook)
webhook.register_trigger("handler", WebhookConfig(url="/hook"))
shutdown
Gracefully shut down the client, releasing all resources.
Cancels any pending reconnection attempts, rejects all in-flight
invocations with an error, closes the WebSocket connection, and
stops the background event-loop thread. After this call the
instance must not be reused.
Signature
Example
iii = register_worker('ws://localhost:49134')
# ... do work ...
iii.shutdown()
shutdown_async
Gracefully shut down the client, releasing all resources.
Cancels any pending reconnection attempts, rejects all in-flight
invocations with an error, closes the WebSocket connection, and
stops the background event-loop thread. After this call the
instance must not be reused.
Signature
Example
iii = register_worker('ws://localhost:49134')
# ... do work ...
await iii.shutdown_async()
trigger
Invoke a remote function.
The routing behavior and return type depend on the action field:
- No action: synchronous — waits for the function to return.
TriggerAction.Enqueue(...): async via named queue — returns EnqueueResult.
TriggerAction.Void(): fire-and-forget — returns None.
Signature
trigger(request: dict[str, Any] | TriggerRequest)
Parameters
| Name | Type | Required | Description |
|---|
request | dict[str, Any] | TriggerRequest | Yes | A TriggerRequest or dict with function_id, payload, and optional action / timeout_ms. |
Example
result = iii.trigger({'function_id': 'greet', 'payload': {'name': 'World'}})
iii.trigger({'function_id': 'notify', 'payload': {}, 'action': TriggerAction.Void()})
trigger_async
Invoke a remote function.
The routing behavior and return type depend on the action field:
- No action: synchronous — waits for the function to return.
TriggerAction.Enqueue(...): async via named queue — returns EnqueueResult.
TriggerAction.Void(): fire-and-forget — returns None.
Signature
async (request: dict[str, Any] | TriggerRequest)
Parameters
| Name | Type | Required | Description |
|---|
request | dict[str, Any] | TriggerRequest | Yes | A TriggerRequest or dict with function_id, payload, and optional action / timeout_ms. |
Example
result = await iii.trigger_async({'function_id': 'greet', 'payload': {'name': 'World'}})
await iii.trigger_async({'function_id': 'notify', 'payload': {}, 'action': TriggerAction.Void()})
unregister_trigger_type
Unregister a previously registered trigger type.
Signature
unregister_trigger_type(trigger_type: RegisterTriggerTypeInput | dict[str, Any])
Parameters
| Name | Type | Required | Description |
|---|
trigger_type | RegisterTriggerTypeInput | dict[str, Any] | Yes | A RegisterTriggerTypeInput or dict with id and optional description. |
Example
iii.unregister_trigger_type({"id": "webhook", "description": "Webhook trigger"})
iii.unregister_trigger_type(RegisterTriggerTypeInput(id="webhook", description="Webhook trigger"))
Logger
Structured logger that emits logs as OpenTelemetry LogRecords.
Every log call automatically captures the active trace and span context,
correlating your logs with distributed traces without any manual wiring.
When OTel is not initialized, Logger gracefully falls back to Python
logging.
Pass structured data as the second argument to any log method. Using a
dict of key-value pairs (instead of string interpolation) lets you
filter, aggregate, and build dashboards in your observability backend.
debug
Log a debug-level message.
Signature
debug(message: str, data: Any = None)
Parameters
| Name | Type | Required | Description |
|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
Example
logger.debug('Cache lookup', {'key': 'user:42', 'hit': False})
error
Log an error-level message.
Signature
error(message: str, data: Any = None)
Parameters
| Name | Type | Required | Description |
|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
Example
logger.error('Payment failed', {
'order_id': 'ord_123',
'gateway': 'stripe',
'error_code': 'card_declined',
})
info
Log an info-level message.
Signature
info(message: str, data: Any = None)
Parameters
| Name | Type | Required | Description |
|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
Example
logger.info('Order processed', {'order_id': 'ord_123', 'status': 'completed'})
warn
Log a warning-level message.
Signature
warn(message: str, data: Any = None)
Parameters
| Name | Type | Required | Description |
|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
Example
logger.warn('Retry attempt', {'attempt': 3, 'max_retries': 5, 'endpoint': '/api/charge'})
Types
InitOptions · ReconnectionConfig · TelemetryOptions · HttpInvocationConfig · RegisterFunctionFormat · RegisterServiceInput · RegisterTriggerInput · RegisterTriggerTypeInput · TriggerActionEnqueue · TriggerActionVoid · TriggerRequest · IStream · OtelConfig · TriggerHandler
InitOptions
Options for configuring the III SDK.
| Name | Type | Required | Description |
|---|
enable_metrics_reporting | bool | No | Enable worker metrics via OpenTelemetry. Default True. |
headers | dict[str, str] | None | No | - |
invocation_timeout_ms | int | No | Default timeout for trigger() in milliseconds. Default 30000. |
otel | OtelConfig | dict[str, Any] | None | No | OpenTelemetry configuration. Enabled by default. Set \{'enabled': False\} or env OTEL_ENABLED=false to disable. |
reconnection_config | ReconnectionConfig | None | No | WebSocket reconnection behavior. |
telemetry | TelemetryOptions | None | No | Internal telemetry metadata. |
worker_name | str | None | No | Display name for this worker. Defaults to hostname:pid. |
ReconnectionConfig
Configuration for WebSocket reconnection behavior.
| Name | Type | Required | Description |
|---|
backoff_multiplier | float | No | Exponential backoff multiplier. Default 2.0. |
initial_delay_ms | int | No | Starting delay in milliseconds. Default 1000. |
jitter_factor | float | No | Random jitter factor (0—1). Default 0.3. |
max_delay_ms | int | No | Maximum delay cap in milliseconds. Default 30000. |
max_retries | int | No | Maximum retry attempts. -1 for infinite. Default -1. |
TelemetryOptions
Telemetry metadata to be reported to the engine.
| Name | Type | Required | Description |
|---|
amplitude_api_key | str | None | No | Amplitude API key for product analytics. |
framework | str | None | No | Framework name, if applicable. |
language | str | None | No | Programming language of the worker (e.g. python). |
project_name | str | None | No | Name of the project this worker belongs to. |
HttpInvocationConfig
Config for HTTP external function invocation.
| Name | Type | Required | Description |
|---|
auth | HttpAuthConfig | None | No | Authentication configuration (bearer, HMAC, or API key). |
headers | dict[str, str] | None | No | Additional HTTP headers to include in the request. |
method | Literal['GET', 'POST', 'PUT', 'PATCH', 'DELETE'] | No | HTTP method. Defaults to 'POST'. |
timeout_ms | int | None | No | Request timeout in milliseconds. |
url | str | No | Target URL for the HTTP invocation. |
Format definition for function parameters.
| Name | Type | Required | Description |
|---|
body | list[RegisterFunctionFormat] | None | No | Nested fields for object types. |
description | str | None | No | Human-readable description of the parameter. |
items | RegisterFunctionFormat | None | No | Item schema for array types. |
name | str | Yes | Parameter name. |
required | bool | No | Whether the parameter is required. |
type | str | Yes | Type string (string, number, boolean, object, array, null, map). |
Input for registering a service (matches Node SDK’s RegisterServiceInput).
| Name | Type | Required | Description |
|---|
description | str | None | No | Description of the service. |
id | str | No | Unique service identifier. |
name | str | None | No | Human-readable service name. |
parent_service_id | str | None | No | ID of the parent service for hierarchical grouping. |
Input for registering a trigger (matches Node SDK’s RegisterTriggerInput).
| Name | Type | Required | Description |
|---|
config | Any | No | Trigger-type-specific configuration. |
function_id | str | No | ID of the function this trigger invokes. |
metadata | dict[str, Any] | None | No | Arbitrary metadata attached to the trigger. |
type | str | No | Trigger type identifier (e.g. http, queue, cron). |
Input for registering a trigger type.
| Name | Type | Required | Description |
|---|
call_request_format | Any | None | No | JSON Schema describing the payload sent to functions. |
description | str | No | Human-readable description of the trigger type. |
id | str | No | Unique identifier for the trigger type. |
trigger_request_format | Any | None | No | JSON Schema describing the expected trigger config. |
TriggerActionEnqueue
Routes the invocation through a named queue for async processing.
| Name | Type | Required | Description |
|---|
queue | str | Yes | Name of the target queue. |
type | Literal['enqueue'] | No | Always 'enqueue'. |
TriggerActionVoid
Fire-and-forget routing. No response is returned.
| Name | Type | Required | Description |
|---|
type | Literal['void'] | No | Always 'void'. |
TriggerRequest
Request object for trigger().
| Name | Type | Required | Description |
|---|
action | TriggerActionEnqueue | TriggerActionVoid | None | No | Routing action — None for sync, TriggerAction.Enqueue(...) for queue, TriggerAction.Void() for fire-and-forget. |
function_id | str | No | ID of the function to invoke. |
payload | Any | No | Data to pass to the function. |
timeout_ms | int | None | No | Override the default invocation timeout. |
IStream
Abstract interface for stream operations.
OtelConfig
Configuration for OpenTelemetry initialization.
| Name | Type | Required | Description |
|---|
enabled | bool | None | No | Enable OTel. Defaults to True. Set OTEL_ENABLED=false/0/no/off to disable. |
engine_ws_url | str | None | No | III Engine WebSocket URL. Defaults to env III_URL or ‘ws://localhost:49134’. |
fetch_instrumentation_enabled | bool | No | Auto-instrument urllib HTTP calls via URLLibInstrumentor. Defaults to True. |
logs_batch_size | int | None | No | Maximum number of log records exported per batch. Defaults to 1 when not set. |
logs_enabled | bool | None | No | Enable OTel log export via EngineLogExporter. Defaults to True when OTel is enabled. |
logs_flush_interval_ms | int | None | No | Log processor flush delay in milliseconds. Defaults to 100ms when not set. |
metrics_enabled | bool | No | Enable OTel metrics export via EngineMetricsExporter. Defaults to True. |
metrics_export_interval_ms | int | No | Metrics export interval in milliseconds. Defaults to 60000 (60 seconds). |
service_instance_id | str | None | No | Service instance ID. Defaults to a random UUID. |
service_name | str | None | No | Service name. Defaults to env OTEL_SERVICE_NAME or ‘iii-python-sdk’. |
service_namespace | str | None | No | Service namespace attribute. |
service_version | str | None | No | Service version. Defaults to env SERVICE_VERSION or ‘unknown’. |
TriggerHandler
Abstract base class for trigger handlers.