iii
API Reference

Python SDK

API reference for the iii SDK for Python.

Installation

pip install iii-sdk

Initialization

Methods

create_channel

Create a streaming channel pair for worker-to-worker data transfer.

The returned Channel contains a local writer / reader and their serializable refs (writer_ref, reader_ref) that can be passed as fields in invocation data to other functions.

Signature

create_channel(buffer_size: int | None = None)

Parameters

NameTypeRequiredDescription
buffer_sizeint | NoneNoBuffer capacity for the channel. Defaults to 64.

Example

    >>> ch = iii.create_channel()
    >>> fn = iii.register_function({"id": "producer"}, producer_handler)
    >>> iii.trigger({"function_id": "producer", "payload": {"output": ch.writer_ref}})

create_stream

Register a custom stream implementation, overriding the engine default.

Registers 5 of the 6 IStream methods (get, set, delete, list, list_groups). The update method is not registered -- atomic updates are handled by the engine's built-in stream update logic.

Signature

create_stream(stream_name: str, stream: IStream[Any])

Parameters

NameTypeRequiredDescription
stream_namestrYesUnique name for the stream.
streamIStream[Any]YesAn object implementing the IStream interface.

Example

    >>> from iii.stream import IStream
    >>> class MyStream(IStream):
    ...     async def get(self, input): ...
    ...     async def set(self, input): ...
    ...     async def delete(self, input): ...
    ...     async def list(self, input): ...
    ...     async def list_groups(self, input): ...
    ...     async def update(self, input): ...
    >>> iii.create_stream("my-stream", MyStream())

get_connection_state

Return the current WebSocket connection state.

Returns: One of "disconnected", "connecting", "connected", "reconnecting", or "failed".

Signature

get_connection_state()

list_functions

List all functions registered with the engine across all workers.

Returns: A list of FunctionInfo objects describing each function.

Examples:

for fn in iii.list_functions(): ... print(fn.function_id, fn.description)

Signature

list_functions()

Example

    >>> for fn in iii.list_functions():
    ...     print(fn.function_id, fn.description)

list_triggers

List all triggers registered with the engine.

Signature

list_triggers(include_internal: bool = False)

Parameters

NameTypeRequiredDescription
include_internalboolNoIf True, include engine-internal triggers (e.g. functions-available). Defaults to False.

Example

    >>> triggers = iii.list_triggers()
    >>> internal = iii.list_triggers(include_internal=True)

list_workers

List all workers currently connected to the engine.

Returns: A list of WorkerInfo objects with worker metadata.

Examples:

for w in iii.list_workers(): ... print(w.name, w.worker_id)

Signature

list_workers()

Example

    >>> for w in iii.list_workers():
    ...     print(w.name, w.worker_id)

on_functions_available

Subscribe to function-availability events from the engine.

The callback fires whenever the set of available functions changes (e.g. a new worker connects or a function is unregistered).

Signature

on_functions_available(callback: Callable[None])

Parameters

NameTypeRequiredDescription
callbackCallable[None]YesReceives the current list of FunctionInfo objects each time availability changes.

Example

    >>> def on_change(functions):
    ...     print("Available:", [f.function_id for f in functions])
    >>> unsub = iii.on_functions_available(on_change)
    >>> # later ...
    >>> unsub()

register_function

Register a function with the engine.

Pass a handler for local execution, or an HttpInvocationConfig for HTTP-invoked functions (Lambda, Cloudflare Workers, etc.).

Signature

register_function(func: RegisterFunctionInput | dict[str, Any], handler_or_invocation: RemoteFunctionHandler | HttpInvocationConfig)

Parameters

NameTypeRequiredDescription
funcRegisterFunctionInput | dict[str, Any]YesA RegisterFunctionInput or dict with id and optional description, metadata, request_format, response_format.
handler_or_invocationRemoteFunctionHandler | HttpInvocationConfigYesHandler callable or HttpInvocationConfig.

Example

    >>> def greet(data):
    ...     return {'message': f"Hello, {data['name']}!"}
    >>> fn = iii.register_function({"id": "greet", "description": "Greets a user"}, greet)

register_service

Register a logical service grouping with the engine.

Services provide an organisational hierarchy for functions. A service can optionally reference a parent_service_id to form a tree visible in the engine dashboard.

Signature

register_service(service: RegisterServiceInput | dict[str, Any])

Parameters

NameTypeRequiredDescription
serviceRegisterServiceInput | dict[str, Any]YesA RegisterServiceInput or dict with id and optional name, description, parent_service_id.

Example

    >>> iii.register_service({"id": "payments", "description": "Payment processing"})
    >>> iii.register_service({
    ...     "id": "payments::refunds",
    ...     "description": "Refund sub-service",
    ...     "parent_service_id": "payments",
    ... })

register_trigger

Bind a trigger configuration to a registered function.

Signature

register_trigger(trigger: RegisterTriggerInput | dict[str, Any])

Parameters

NameTypeRequiredDescription
triggerRegisterTriggerInput | dict[str, Any]YesA RegisterTriggerInput or dict with type, function_id, and optional config.

Example

    >>> trigger = iii.register_trigger({
    ...   'type': 'http',
    ...   'function_id': 'greet',
    ...   'config': {'api_path': '/greet', 'http_method': 'GET'}
    ... })
    >>> trigger = iii.register_trigger(RegisterTriggerInput(
    ...     type="http", function_id="greet",
    ...     config={'api_path': '/greet', 'http_method': 'GET'}
    ... ))
    >>> trigger.unregister()

register_trigger_type

Register a custom trigger type with the engine.

Signature

register_trigger_type(trigger_type: RegisterTriggerTypeInput | dict[str, Any], handler: TriggerHandler[Any])

Parameters

NameTypeRequiredDescription
trigger_typeRegisterTriggerTypeInput | dict[str, Any]YesA RegisterTriggerTypeInput or dict with id and description.
handlerTriggerHandler[Any]YesHandler implementing register_trigger and unregister_trigger.

Example

    >>> iii.register_trigger_type({"id": "webhook", "description": "Webhook trigger"}, handler)
    >>> iii.register_trigger_type(
    ...     RegisterTriggerTypeInput(id="webhook", description="Webhook trigger"), handler
    ... )

shutdown

Gracefully shut down the client, releasing all resources.

Cancels any pending reconnection attempts, rejects all in-flight invocations with an error, closes the WebSocket connection, and stops the background event-loop thread. After this call the instance must not be reused.

Examples:

iii = register_worker('ws://localhost:49134')

... do work ...

iii.shutdown()

Signature

shutdown()

Example

    >>> iii = register_worker('ws://localhost:49134')
    >>> # ... do work ...
    >>> iii.shutdown()

trigger

Invoke a remote function.

The routing behavior and return type depend on the action field:

  • No action: synchronous -- waits for the function to return.
  • TriggerAction.Enqueue(...): async via named queue -- returns EnqueueResult.
  • TriggerAction.Void(): fire-and-forget -- returns None.

Signature

trigger(request: dict[str, Any] | TriggerRequest)

Parameters

NameTypeRequiredDescription
requestdict[str, Any] | TriggerRequestYesA TriggerRequest or dict with function_id, payload, and optional action / timeout_ms.

Example

    >>> result = iii.trigger({'function_id': 'greet', 'payload': {'name': 'World'}})
    >>> iii.trigger({'function_id': 'notify', 'payload': {}, 'action': TriggerAction.Void()})

unregister_trigger_type

Unregister a previously registered trigger type.

Signature

unregister_trigger_type(trigger_type: RegisterTriggerTypeInput | dict[str, Any])

Parameters

NameTypeRequiredDescription
trigger_typeRegisterTriggerTypeInput | dict[str, Any]YesA RegisterTriggerTypeInput or dict with id and optional description.

Example

    >>> iii.unregister_trigger_type({"id": "webhook", "description": "Webhook trigger"})
    >>> iii.unregister_trigger_type(RegisterTriggerTypeInput(id="webhook", description="Webhook trigger"))

Logger

Structured logger that emits logs as OpenTelemetry LogRecords.

Every log call automatically captures the active trace and span context, correlating your logs with distributed traces without any manual wiring. When OTel is not initialized, Logger gracefully falls back to Python logging.

Pass structured data as the second argument to any log method. Using a dict of key-value pairs (instead of string interpolation) lets you filter, aggregate, and build dashboards in your observability backend.

Examples:

from iii import Logger logger = Logger()

Basic logging — trace context is injected automatically

logger.info('Worker connected')

Structured context for dashboards and alerting

logger.info('Order processed', {'order_id': 'ord_123', 'amount': 49.99, 'currency': 'USD'}) logger.warn('Retry attempt', {'attempt': 3, 'max_retries': 5, 'endpoint': '/api/charge'}) logger.error('Payment failed', {'order_id': 'ord_123', 'gateway': 'stripe', 'error_code': 'card_declined'})

debug

Log a debug-level message.

Signature

debug(message: str, data: Any = None)

Parameters

NameTypeRequiredDescription
messagestrYesHuman-readable log message.
dataAnyNoStructured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic).

Example

    >>> logger.debug('Cache lookup', {'key': 'user:42', 'hit': False})

error

Log an error-level message.

Signature

error(message: str, data: Any = None)

Parameters

NameTypeRequiredDescription
messagestrYesHuman-readable log message.
dataAnyNoStructured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic).

Example

    >>> logger.error('Payment failed', {
    ...     'order_id': 'ord_123',
    ...     'gateway': 'stripe',
    ...     'error_code': 'card_declined',
    ... })

info

Log an info-level message.

Signature

info(message: str, data: Any = None)

Parameters

NameTypeRequiredDescription
messagestrYesHuman-readable log message.
dataAnyNoStructured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic).

Example

    >>> logger.info('Order processed', {'order_id': 'ord_123', 'status': 'completed'})

warn

Log a warning-level message.

Signature

warn(message: str, data: Any = None)

Parameters

NameTypeRequiredDescription
messagestrYesHuman-readable log message.
dataAnyNoStructured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic).

Example

    >>> logger.warn('Retry attempt', {'attempt': 3, 'max_retries': 5, 'endpoint': '/api/charge'})

Types

HttpInvocationConfig · RegisterFunctionFormat · RegisterFunctionInput · RegisterServiceInput · RegisterTriggerInput · RegisterTriggerTypeInput · TriggerActionEnqueue · TriggerActionVoid · TriggerRequest · IStream · TriggerHandler

HttpInvocationConfig

Config for HTTP external function invocation.

NameTypeRequiredDescription
authHttpAuthConfig | NoneNo-
headersdict[str, str] | NoneNo-
methodLiteral['GET', 'POST', 'PUT', 'PATCH', 'DELETE']No-
timeout_msint | NoneNo-
urlstrYes-

RegisterFunctionFormat

Format definition for function parameters.

NameTypeRequiredDescription
bodylist[RegisterFunctionFormat] | NoneNo-
descriptionstr | NoneNo-
itemsRegisterFunctionFormat | NoneNo-
namestrYes-
requiredboolNo-
typestrYes-

RegisterFunctionInput

Input for registering a function — matches Node.js RegisterFunctionInput.

NameTypeRequiredDescription
descriptionstr | NoneNo-
idstrYes-
invocationHttpInvocationConfig | NoneNo-
metadatadict[str, Any] | NoneNo-
request_formatRegisterFunctionFormat | NoneNo-
response_formatRegisterFunctionFormat | NoneNo-

RegisterServiceInput

Input for registering a service (matches Node SDK's RegisterServiceInput).

NameTypeRequiredDescription
descriptionstr | NoneNo-
idstrYes-
namestr | NoneNo-
parent_service_idstr | NoneNo-

RegisterTriggerInput

Input for registering a trigger (matches Node SDK's RegisterTriggerInput).

NameTypeRequiredDescription
configAnyNo-
function_idstrYes-
typestrYes-

RegisterTriggerTypeInput

Input for registering a trigger type (matches Node SDK's RegisterTriggerTypeInput).

NameTypeRequiredDescription
descriptionstrYes-
idstrYes-

TriggerActionEnqueue

Routes the invocation through a named queue for async processing.

NameTypeRequiredDescription
queuestrYes-
typeLiteral['enqueue']No-

TriggerActionVoid

Fire-and-forget routing. No response is returned.

NameTypeRequiredDescription
typeLiteral['void']No-

TriggerRequest

Request object for trigger().

NameTypeRequiredDescription
actionTriggerActionEnqueue | TriggerActionVoid | NoneNo-
function_idstrYes-
payloadAnyNo-
timeout_msint | NoneNo-

IStream

Abstract interface for stream operations.

TriggerHandler

Abstract base class for trigger handlers.

On this page