Python SDK
API reference for the iii SDK for Python.
Installation
pip install iii-sdkInitialization
Methods
create_channel
Create a streaming channel pair for worker-to-worker data transfer.
The returned Channel contains a local writer / reader
and their serializable refs (writer_ref, reader_ref) that
can be passed as fields in invocation data to other functions.
Signature
create_channel(buffer_size: int | None = None)Parameters
| Name | Type | Required | Description |
|---|---|---|---|
buffer_size | int | None | No | Buffer capacity for the channel. Defaults to 64. |
Example
>>> ch = iii.create_channel()
>>> fn = iii.register_function({"id": "producer"}, producer_handler)
>>> iii.trigger({"function_id": "producer", "payload": {"output": ch.writer_ref}})create_stream
Register a custom stream implementation, overriding the engine default.
Registers 5 of the 6 IStream methods (get, set, delete,
list, list_groups). The update method is not registered
-- atomic updates are handled by the engine's built-in stream update logic.
Signature
create_stream(stream_name: str, stream: IStream[Any])Parameters
| Name | Type | Required | Description |
|---|---|---|---|
stream_name | str | Yes | Unique name for the stream. |
stream | IStream[Any] | Yes | An object implementing the IStream interface. |
Example
>>> from iii.stream import IStream
>>> class MyStream(IStream):
... async def get(self, input): ...
... async def set(self, input): ...
... async def delete(self, input): ...
... async def list(self, input): ...
... async def list_groups(self, input): ...
... async def update(self, input): ...
>>> iii.create_stream("my-stream", MyStream())get_connection_state
Return the current WebSocket connection state.
Returns:
One of "disconnected", "connecting", "connected",
"reconnecting", or "failed".
Signature
get_connection_state()list_functions
List all functions registered with the engine across all workers.
Returns:
A list of FunctionInfo objects describing each function.
Examples:
for fn in iii.list_functions(): ... print(fn.function_id, fn.description)
Signature
list_functions()Example
>>> for fn in iii.list_functions():
... print(fn.function_id, fn.description)list_triggers
List all triggers registered with the engine.
Signature
list_triggers(include_internal: bool = False)Parameters
| Name | Type | Required | Description |
|---|---|---|---|
include_internal | bool | No | If True, include engine-internal triggers (e.g. functions-available). Defaults to False. |
Example
>>> triggers = iii.list_triggers()
>>> internal = iii.list_triggers(include_internal=True)list_workers
List all workers currently connected to the engine.
Returns:
A list of WorkerInfo objects with worker metadata.
Examples:
for w in iii.list_workers(): ... print(w.name, w.worker_id)
Signature
list_workers()Example
>>> for w in iii.list_workers():
... print(w.name, w.worker_id)on_functions_available
Subscribe to function-availability events from the engine.
The callback fires whenever the set of available functions changes (e.g. a new worker connects or a function is unregistered).
Signature
on_functions_available(callback: Callable[None])Parameters
| Name | Type | Required | Description |
|---|---|---|---|
callback | Callable[None] | Yes | Receives the current list of FunctionInfo objects each time availability changes. |
Example
>>> def on_change(functions):
... print("Available:", [f.function_id for f in functions])
>>> unsub = iii.on_functions_available(on_change)
>>> # later ...
>>> unsub()register_function
Register a function with the engine.
Pass a handler for local execution, or an HttpInvocationConfig
for HTTP-invoked functions (Lambda, Cloudflare Workers, etc.).
Signature
register_function(func: RegisterFunctionInput | dict[str, Any], handler_or_invocation: RemoteFunctionHandler | HttpInvocationConfig)Parameters
| Name | Type | Required | Description |
|---|---|---|---|
func | RegisterFunctionInput | dict[str, Any] | Yes | A RegisterFunctionInput or dict with id and optional description, metadata, request_format, response_format. |
handler_or_invocation | RemoteFunctionHandler | HttpInvocationConfig | Yes | Handler callable or HttpInvocationConfig. |
Example
>>> def greet(data):
... return {'message': f"Hello, {data['name']}!"}
>>> fn = iii.register_function({"id": "greet", "description": "Greets a user"}, greet)register_service
Register a logical service grouping with the engine.
Services provide an organisational hierarchy for functions. A
service can optionally reference a parent_service_id to form
a tree visible in the engine dashboard.
Signature
register_service(service: RegisterServiceInput | dict[str, Any])Parameters
| Name | Type | Required | Description |
|---|---|---|---|
service | RegisterServiceInput | dict[str, Any] | Yes | A RegisterServiceInput or dict with id and optional name, description, parent_service_id. |
Example
>>> iii.register_service({"id": "payments", "description": "Payment processing"})
>>> iii.register_service({
... "id": "payments::refunds",
... "description": "Refund sub-service",
... "parent_service_id": "payments",
... })register_trigger
Bind a trigger configuration to a registered function.
Signature
register_trigger(trigger: RegisterTriggerInput | dict[str, Any])Parameters
| Name | Type | Required | Description |
|---|---|---|---|
trigger | RegisterTriggerInput | dict[str, Any] | Yes | A RegisterTriggerInput or dict with type, function_id, and optional config. |
Example
>>> trigger = iii.register_trigger({
... 'type': 'http',
... 'function_id': 'greet',
... 'config': {'api_path': '/greet', 'http_method': 'GET'}
... })
>>> trigger = iii.register_trigger(RegisterTriggerInput(
... type="http", function_id="greet",
... config={'api_path': '/greet', 'http_method': 'GET'}
... ))
>>> trigger.unregister()register_trigger_type
Register a custom trigger type with the engine.
Signature
register_trigger_type(trigger_type: RegisterTriggerTypeInput | dict[str, Any], handler: TriggerHandler[Any])Parameters
| Name | Type | Required | Description |
|---|---|---|---|
trigger_type | RegisterTriggerTypeInput | dict[str, Any] | Yes | A RegisterTriggerTypeInput or dict with id and description. |
handler | TriggerHandler[Any] | Yes | Handler implementing register_trigger and unregister_trigger. |
Example
>>> iii.register_trigger_type({"id": "webhook", "description": "Webhook trigger"}, handler)
>>> iii.register_trigger_type(
... RegisterTriggerTypeInput(id="webhook", description="Webhook trigger"), handler
... )shutdown
Gracefully shut down the client, releasing all resources.
Cancels any pending reconnection attempts, rejects all in-flight invocations with an error, closes the WebSocket connection, and stops the background event-loop thread. After this call the instance must not be reused.
Examples:
iii = register_worker('ws://localhost:49134')
... do work ...
iii.shutdown()
Signature
shutdown()Example
>>> iii = register_worker('ws://localhost:49134')
>>> # ... do work ...
>>> iii.shutdown()trigger
Invoke a remote function.
The routing behavior and return type depend on the action field:
- No action: synchronous -- waits for the function to return.
TriggerAction.Enqueue(...): async via named queue -- returnsEnqueueResult.TriggerAction.Void(): fire-and-forget -- returnsNone.
Signature
trigger(request: dict[str, Any] | TriggerRequest)Parameters
| Name | Type | Required | Description |
|---|---|---|---|
request | dict[str, Any] | TriggerRequest | Yes | A TriggerRequest or dict with function_id, payload, and optional action / timeout_ms. |
Example
>>> result = iii.trigger({'function_id': 'greet', 'payload': {'name': 'World'}})
>>> iii.trigger({'function_id': 'notify', 'payload': {}, 'action': TriggerAction.Void()})unregister_trigger_type
Unregister a previously registered trigger type.
Signature
unregister_trigger_type(trigger_type: RegisterTriggerTypeInput | dict[str, Any])Parameters
| Name | Type | Required | Description |
|---|---|---|---|
trigger_type | RegisterTriggerTypeInput | dict[str, Any] | Yes | A RegisterTriggerTypeInput or dict with id and optional description. |
Example
>>> iii.unregister_trigger_type({"id": "webhook", "description": "Webhook trigger"})
>>> iii.unregister_trigger_type(RegisterTriggerTypeInput(id="webhook", description="Webhook trigger"))Logger
Structured logger that emits logs as OpenTelemetry LogRecords.
Every log call automatically captures the active trace and span context,
correlating your logs with distributed traces without any manual wiring.
When OTel is not initialized, Logger gracefully falls back to Python
logging.
Pass structured data as the second argument to any log method. Using a dict of key-value pairs (instead of string interpolation) lets you filter, aggregate, and build dashboards in your observability backend.
Examples:
from iii import Logger logger = Logger()
Basic logging — trace context is injected automatically
logger.info('Worker connected')
Structured context for dashboards and alerting
logger.info('Order processed', {'order_id': 'ord_123', 'amount': 49.99, 'currency': 'USD'}) logger.warn('Retry attempt', {'attempt': 3, 'max_retries': 5, 'endpoint': '/api/charge'}) logger.error('Payment failed', {'order_id': 'ord_123', 'gateway': 'stripe', 'error_code': 'card_declined'})
debug
Log a debug-level message.
Signature
debug(message: str, data: Any = None)Parameters
| Name | Type | Required | Description |
|---|---|---|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
Example
>>> logger.debug('Cache lookup', {'key': 'user:42', 'hit': False})error
Log an error-level message.
Signature
error(message: str, data: Any = None)Parameters
| Name | Type | Required | Description |
|---|---|---|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
Example
>>> logger.error('Payment failed', {
... 'order_id': 'ord_123',
... 'gateway': 'stripe',
... 'error_code': 'card_declined',
... })info
Log an info-level message.
Signature
info(message: str, data: Any = None)Parameters
| Name | Type | Required | Description |
|---|---|---|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
Example
>>> logger.info('Order processed', {'order_id': 'ord_123', 'status': 'completed'})warn
Log a warning-level message.
Signature
warn(message: str, data: Any = None)Parameters
| Name | Type | Required | Description |
|---|---|---|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
Example
>>> logger.warn('Retry attempt', {'attempt': 3, 'max_retries': 5, 'endpoint': '/api/charge'})Types
HttpInvocationConfig · RegisterFunctionFormat · RegisterFunctionInput · RegisterServiceInput · RegisterTriggerInput · RegisterTriggerTypeInput · TriggerActionEnqueue · TriggerActionVoid · TriggerRequest · IStream · TriggerHandler
HttpInvocationConfig
Config for HTTP external function invocation.
| Name | Type | Required | Description |
|---|---|---|---|
auth | HttpAuthConfig | None | No | - |
headers | dict[str, str] | None | No | - |
method | Literal['GET', 'POST', 'PUT', 'PATCH', 'DELETE'] | No | - |
timeout_ms | int | None | No | - |
url | str | Yes | - |
RegisterFunctionFormat
Format definition for function parameters.
| Name | Type | Required | Description |
|---|---|---|---|
body | list[RegisterFunctionFormat] | None | No | - |
description | str | None | No | - |
items | RegisterFunctionFormat | None | No | - |
name | str | Yes | - |
required | bool | No | - |
type | str | Yes | - |
RegisterFunctionInput
Input for registering a function — matches Node.js RegisterFunctionInput.
| Name | Type | Required | Description |
|---|---|---|---|
description | str | None | No | - |
id | str | Yes | - |
invocation | HttpInvocationConfig | None | No | - |
metadata | dict[str, Any] | None | No | - |
request_format | RegisterFunctionFormat | None | No | - |
response_format | RegisterFunctionFormat | None | No | - |
RegisterServiceInput
Input for registering a service (matches Node SDK's RegisterServiceInput).
| Name | Type | Required | Description |
|---|---|---|---|
description | str | None | No | - |
id | str | Yes | - |
name | str | None | No | - |
parent_service_id | str | None | No | - |
RegisterTriggerInput
Input for registering a trigger (matches Node SDK's RegisterTriggerInput).
| Name | Type | Required | Description |
|---|---|---|---|
config | Any | No | - |
function_id | str | Yes | - |
type | str | Yes | - |
RegisterTriggerTypeInput
Input for registering a trigger type (matches Node SDK's RegisterTriggerTypeInput).
| Name | Type | Required | Description |
|---|---|---|---|
description | str | Yes | - |
id | str | Yes | - |
TriggerActionEnqueue
Routes the invocation through a named queue for async processing.
| Name | Type | Required | Description |
|---|---|---|---|
queue | str | Yes | - |
type | Literal['enqueue'] | No | - |
TriggerActionVoid
Fire-and-forget routing. No response is returned.
| Name | Type | Required | Description |
|---|---|---|---|
type | Literal['void'] | No | - |
TriggerRequest
Request object for trigger().
| Name | Type | Required | Description |
|---|---|---|---|
action | TriggerActionEnqueue | TriggerActionVoid | None | No | - |
function_id | str | Yes | - |
payload | Any | No | - |
timeout_ms | int | None | No | - |
IStream
Abstract interface for stream operations.
TriggerHandler
Abstract base class for trigger handlers.