Skip to main content

Installation

pip install iii-helpers
API reference for the iii-helpers package (Python).

http

HTTP request/response types, auth config, and the http helper. Import
from iii_helpers.http import ...

Functions

http

Wrap a streaming handler so it receives typed StreamRequest and StreamResponse. Takes a callback (req, res) -> HttpResponse | None and returns a function the iii engine can invoke directly. The wrapper converts the raw dict (or InternalHttpRequest) delivered by the engine into the typed StreamRequest / StreamResponse pair that the callback expects. Signature
http(callback: Callable[Awaitable[HttpResponse[Any] | None]])

Parameters

NameTypeRequiredDescription
callbackCallable[Awaitable[HttpResponse[Any] | None]]Yes-

Types

HttpAuthApiKey · HttpAuthBearer · HttpAuthHmac · HttpInvocationConfig · HttpRequest · HttpResponse

HttpAuthApiKey

API key sent via a custom header.
NameTypeRequiredDescription
headerstrNo-
typeLiteral['api_key']No-
value_keystrNo-

HttpAuthBearer

Bearer token authentication.
NameTypeRequiredDescription
token_keystrNo-
typeLiteral['bearer']No-

HttpAuthHmac

HMAC signature verification using a shared secret.
NameTypeRequiredDescription
secret_keystrNo-
typeLiteral['hmac']No-

HttpInvocationConfig

Config for HTTP external function invocation.
NameTypeRequiredDescription
authHttpAuthConfig | NoneNoAuthentication configuration (bearer, HMAC, or API key).
headersdict[str, str] | NoneNoAdditional HTTP headers to include in the request.
methodHttpMethodNoHTTP method. Defaults to 'POST'.
timeout_msint | NoneNoRequest timeout in milliseconds.
urlstrNoTarget URL for the HTTP invocation.

HttpRequest

Represents a buffered HTTP request.
NameTypeRequiredDescription
bodyAny | NoneNo-
headersdict[str, str | list[str]]No-
methodstrNo-
path_paramsdict[str, str]No-
query_paramsdict[str, str | list[str]]No-

HttpResponse

Represents a buffered HTTP response.
NameTypeRequiredDescription
bodyAny | NoneNo-
headersdict[str, str]No-
model_configAnyNo-
status_codeintNo-

observability

Logger, OpenTelemetry config, and span helpers. Import
from iii_helpers.observability import ...

Functions

current_span_id

Return current active span_id as 16-char hex, or None when unavailable. Signature
current_span_id()

current_span_is_recording

Returns False when there is no active span or the sampler dropped it. Signature
current_span_is_recording()

current_trace_id

Return current active trace_id as 32-char hex, or None when unavailable. Signature
current_trace_id()

execute_traced_request

Execute an httpx Request inside an OTel CLIENT span.
  • Injects W3C traceparent into outgoing request headers.
  • Records HTTP semantic-convention attributes on the span.
  • Sets ERROR span status for responses with status >= 400.
  • Records exceptions for network-level errors.
Signature
async (client, request)

Parameters

NameTypeRequiredDescription
clientAnyYes-
requestAnyYes-

extract_baggage

Extract baggage from a W3C baggage header string. Signature
extract_baggage(baggage: str)

Parameters

NameTypeRequiredDescription
baggagestrYes-

extract_traceparent

Extract a trace context from a W3C traceparent header string. Signature
extract_traceparent(traceparent: str)

Parameters

NameTypeRequiredDescription
traceparentstrYes-

flush_otel

Force-flush all OTel providers without tearing them down. Counterpart to :func:shutdown_otel. Use before short-lived process exits where you want pending spans/metrics/logs delivered but plan to keep using OTel afterwards. Signature
async ()

init_otel

Initialize OpenTelemetry. Subsequent calls are no-ops. Signature
init_otel(config: OtelConfig | None = None, loop: None = None)

Parameters

NameTypeRequiredDescription
configOtelConfig | NoneNo-
loopNoneNo-

inject_baggage

Inject the current baggage into a W3C baggage header string. Signature
inject_baggage()

inject_traceparent

Inject the current trace context into a W3C traceparent header string. Signature
inject_traceparent()

record_span_event

No-op when the current span is not recording. Signature
record_span_event(name: str, attrs: dict[str, Any] | None = None)

Parameters

NameTypeRequiredDescription
namestrYes-
attrsdict[str, Any] | NoneNo-

redact

Signature
redact(value: Any)

Parameters

NameTypeRequiredDescription
valueAnyYes-

redact_and_truncate

Signature
redact_and_truncate(value: Any, max_bytes: Optional[int] = None)

Parameters

NameTypeRequiredDescription
valueAnyYes-
max_bytesOptional[int]No-

resolve_max_bytes_from_env

Signature
resolve_max_bytes_from_env()

set_current_span_attribute

No-op when the current span is not recording. Signature
set_current_span_attribute(key: str, value: Any)

Parameters

NameTypeRequiredDescription
keystrYes-
valueAnyYes-

set_current_span_error

No-op when there is no active span. Signature
set_current_span_error(message: str)

Parameters

NameTypeRequiredDescription
messagestrYes-

shutdown_otel

Shut down OTel synchronously (best-effort; does not await WS flush). Signature
shutdown_otel()

with_span

Start a new span and run fn(span) within it. If the tracer is not initialized, fn is called with a no-op span that silently ignores attribute/event calls. Signature
async (name: str, fn: Any, kind: Any = None, traceparent: str | None = None)

Parameters

NameTypeRequiredDescription
namestrYes-
fnAnyYes-
kindAnyNo-
traceparentstr | NoneNo-

Types

BaggageSpanProcessor · DEFAULT_ALLOWLIST · Logger · OtelConfig · ReconnectionConfig

BaggageSpanProcessor

DEFAULT_ALLOWLIST

DEFAULT_ALLOWLIST = tuple[str, ...]

Logger

Structured logger that emits logs as OpenTelemetry LogRecords. Every log call automatically captures the active trace and span context, correlating your logs with distributed traces without any manual wiring. When OTel is not initialized, Logger gracefully falls back to Python logging. Pass structured data as the second argument to any log method. Using a dict of key-value pairs (instead of string interpolation) lets you filter, aggregate, and build dashboards in your observability backend.

OtelConfig

Configuration for OpenTelemetry initialization.
NameTypeRequiredDescription
enabledbool | NoneNoEnable OTel. Defaults to True. Set OTEL_ENABLED=false/0/no/off to disable.
engine_ws_urlstr | NoneNoIII Engine WebSocket URL. Defaults to env III_URL or ‘ws://localhost:49134’.
fetch_instrumentation_enabledboolNoAuto-instrument urllib HTTP calls via URLLibInstrumentor. Defaults to True.
logs_batch_sizeint | NoneNoMaximum number of log records exported per batch. Defaults to 1 when not set.
logs_enabledbool | NoneNoEnable OTel log export via EngineLogExporter. Defaults to True when OTel is enabled.
logs_flush_interval_msint | NoneNoLog processor flush delay in milliseconds. Defaults to 100ms when not set.
metrics_enabledboolNoEnable OTel metrics export via EngineMetricsExporter. Defaults to True.
metrics_export_interval_msintNoMetrics export interval in milliseconds. Defaults to 60000 (60 seconds).
service_instance_idstr | NoneNoService instance ID. Defaults to a random UUID.
service_namestr | NoneNoService name. Defaults to env OTEL_SERVICE_NAME or ‘iii-python-sdk’.
service_namespacestr | NoneNoService namespace attribute.
service_versionstr | NoneNoService version. Defaults to env SERVICE_VERSION or ‘unknown’.
spans_flush_interval_msint | NoneNoSpan processor flush delay in milliseconds. Defaults to 100ms when not set.

The OpenTelemetry default of 5000ms is what makes traces appear seconds
after the action. Env override: OTEL_SPANS_FLUSH_INTERVAL_MS.

ReconnectionConfig

Configuration for WebSocket reconnection behavior.
NameTypeRequiredDescription
backoff_multiplierfloatNoExponential backoff multiplier. Default 2.0.
initial_delay_msintNoStarting delay in milliseconds. Default 1000.
jitter_factorfloatNoRandom jitter factor (0—1). Default 0.3.
max_delay_msintNoMaximum delay cap in milliseconds. Default 30000.
max_retriesintNoMaximum retry attempts. -1 for infinite. Default -1.

queue

Queue enqueue result types. Import
from iii_helpers.queue import ...

Types

EnqueueResult

EnqueueResult

Result returned when a function is invoked with TriggerAction.Enqueue.
NameTypeRequiredDescription
messageReceiptIdstrNoUUID assigned by the engine to the enqueued job.

stream

Stream trigger configs, change events, IO inputs, and update operations. Import
from iii_helpers.stream import ...

Types

StreamAuthInput · StreamAuthResult · StreamChangeEvent · StreamChangeEventDetail · StreamDeleteInput · StreamDeleteResult · StreamGetInput · StreamJoinLeaveEvent · StreamJoinLeaveTriggerConfig · StreamJoinResult · StreamListGroupsInput · StreamListInput · StreamSetInput · StreamSetResult · StreamTriggerConfig · StreamUpdateInput · StreamUpdateResult · UpdateAppend · UpdateDecrement · UpdateIncrement · UpdateMerge · UpdateOpError · UpdateRemove · UpdateSet

StreamAuthInput

Input for stream authentication.
NameTypeRequiredDescription
addrstrYes-
headersdict[str, str]Yes-
pathstrYes-
query_paramsdict[str, list[str]]Yes-

StreamAuthResult

Result of stream authentication.
NameTypeRequiredDescription
contextAny | NoneNo-

StreamChangeEvent

Handler input for stream triggers, fired when an item changes.
NameTypeRequiredDescription
eventStreamChangeEventDetailYes-
groupIdstrYes-
idstr | NoneNo-
streamNamestrYes-
timestampintYes-
typeLiteral['stream']Yes-

StreamChangeEventDetail

Detail of a stream change event containing the mutation type and data.
NameTypeRequiredDescription
dataAnyYes-
typeLiteral['create', 'update', 'delete']Yes-

StreamDeleteInput

Input for stream delete operation.
NameTypeRequiredDescription
group_idstrYes-
item_idstrYes-
stream_namestrYes-

StreamDeleteResult

Result of stream delete operation.
NameTypeRequiredDescription
old_valueAny | NoneNo-

StreamGetInput

Input for stream get operation.
NameTypeRequiredDescription
group_idstrYes-
item_idstrYes-
stream_namestrYes-

StreamJoinLeaveEvent

Event for stream join/leave.
NameTypeRequiredDescription
contextAny | NoneNo-
group_idstrYes-
idstr | NoneNo-
stream_namestrYes-
subscription_idstrYes-

StreamJoinLeaveTriggerConfig

Trigger config for stream:join and stream:leave triggers.
NameTypeRequiredDescription
condition_function_idstr | NoneNo-

StreamJoinResult

Result of stream join.
NameTypeRequiredDescription
unauthorizedboolYes-

StreamListGroupsInput

Input for stream list groups operation.
NameTypeRequiredDescription
stream_namestrYes-

StreamListInput

Input for stream list operation.
NameTypeRequiredDescription
group_idstrYes-
stream_namestrYes-

StreamSetInput

Input for stream set operation.
NameTypeRequiredDescription
dataAnyYes-
group_idstrYes-
item_idstrYes-
stream_namestrYes-

StreamSetResult

Result of stream set operation.
NameTypeRequiredDescription
new_valueTDataYes-
old_valueTData | NoneNo-

StreamTriggerConfig

Trigger config for stream triggers. Filters which item changes fire the handler.
NameTypeRequiredDescription
condition_function_idstr | NoneNo-
group_idstr | NoneNo-
item_idstr | NoneNo-
stream_namestrYes-

StreamUpdateInput

Input for stream update operation.
NameTypeRequiredDescription
group_idstrYes-
item_idstrYes-
opslist['UpdateOp']Yes-
stream_namestrYes-

StreamUpdateResult

Result of stream update operation.
NameTypeRequiredDescription
errorslist[UpdateOpError]No-
new_valueTDataYes-
old_valueTData | NoneNo-

UpdateAppend

Append an element to an array, concatenate a string, or push at a nested path. The target is the root (when path is omitted, an empty string, or an empty list), a single first-level key (when path is a non-empty string), or an arbitrary nested location (when path is a list of literal segments). Path forms accepted (mirrors :class:UpdateMerge after #1547):
  • None / "" / []: append at the root.
  • "foo": append at the first-level key foo. A dotted string like "a.b" is the literal key "a.b", not traversed as a -> b.
  • ["a", "b", "c"]: nested path; each element is a literal segment.
Engine semantics:
  • Missing/non-object intermediates along a nested path are auto-created/replaced with \{\}.
  • At the leaf:
    • missing/null + nested path -> [value] (always an array)
    • missing/null + single-string path -> string-as-string for the string-concat tier, otherwise [value]
    • existing array -> push
    • existing string + string value -> concatenate
    • existing object/scalar at the leaf -> append.type_mismatch
Validation: invalid paths (depth > 32 segments, segment > 256 bytes, or any __proto__ / constructor / prototype segment) are rejected with a structured error in the errors field of the state::update / stream::update response. The append does not apply when an error is returned for that op.
NameTypeRequiredDescription
pathMergePath | NoneNo-
typestrNo-
valueAnyYes-

UpdateDecrement

Decrement operation for stream update.
NameTypeRequiredDescription
byint | floatYes-
pathstrYes-
typestrNo-

UpdateIncrement

Increment operation for stream update.
NameTypeRequiredDescription
byint | floatYes-
pathstrYes-
typestrNo-

UpdateMerge

Shallow merge an object into the target. The target is the root (when path is omitted, an empty string, or an empty list) or an arbitrary nested location specified by an array of literal segments. Path forms accepted:
  • None / "" / []: merge at the root.
  • "foo": equivalent to ["foo"] — single first-level key.
  • ["a", "b", "c"]: nested path. Each element is a literal key. ["a.b"] writes a single key named "a.b", not a -> b.
Engine semantics:
  • Missing or non-object intermediates along the path are auto-replaced with \{\}.
  • The merge is shallow at the target node (top-level keys of value overwrite same-named keys; siblings preserved).
Validation: invalid paths/values (depth > 32 segments, segment > 256 bytes, value depth > 16, > 1024 top-level keys, or any __proto__ / constructor / prototype segment or top-level key) are rejected with a structured error in the errors array of the state::update / stream::update response. The merge does not apply when an error is returned.
NameTypeRequiredDescription
pathMergePath | NoneNo-
typestrNo-
valueAnyYes-

UpdateOpError

Per-op error returned by state::update / stream::update. Currently emitted only by the merge op when input violates the new validation bounds. Successfully applied ops are still reflected in the response’s new_value.
NameTypeRequiredDescription
codestrYes-
doc_urlstr | NoneNo-
messagestrYes-
op_indexintYes-

UpdateRemove

Remove operation for stream update.
NameTypeRequiredDescription
pathstrYes-
typestrNo-

UpdateSet

Set operation for stream update.
NameTypeRequiredDescription
pathstrYes-
typestrNo-
valueAnyYes-

worker_connection_manager

RBAC auth and registration callback types. Import
from iii_helpers.worker_connection_manager import ...

Types

AuthInput · AuthResult · OnFunctionRegistrationInput · OnFunctionRegistrationResult · OnTriggerRegistrationInput · OnTriggerRegistrationResult · OnTriggerTypeRegistrationInput · OnTriggerTypeRegistrationResult

AuthInput

Input passed to the RBAC auth function during WebSocket upgrade. Contains the HTTP headers, query parameters, and client IP from the connecting worker’s upgrade request.
NameTypeRequiredDescription
headersdict[str, str]NoHTTP headers from the WebSocket upgrade request.
ip_addressstrNoIP address of the connecting client.
query_paramsdict[str, list[str]]NoQuery parameters from the upgrade URL. Each key maps to a list of values to support repeated keys.

AuthResult

Return value from the RBAC auth function. Controls which functions the authenticated worker can invoke and what context is forwarded to the middleware.
NameTypeRequiredDescription
allow_function_registrationboolNo-
allow_trigger_type_registrationboolNoWhether the worker may register new trigger types.
allowed_functionslist[str]NoAdditional function IDs to allow beyond expose_functions.
allowed_trigger_typeslist[str] | NoneNoTrigger type IDs the worker may register triggers for. When None, all types are allowed.
contextdict[str, Any]NoArbitrary context forwarded to the middleware function on every invocation.
forbidden_functionslist[str]NoFunction IDs to deny even if they match expose_functions.
function_registration_prefixstr | NoneNoOptional prefix applied to all function IDs registered by this worker.

OnFunctionRegistrationInput

Input passed to the on_function_registration_function_id hook when a worker attempts to register a function through the RBAC port. Return an OnFunctionRegistrationResult with the (possibly mapped) fields, or raise an exception to deny the registration.
NameTypeRequiredDescription
contextdict[str, Any]NoAuth context from AuthResult.context for this session.
descriptionstr | NoneNoHuman-readable description of the function.
function_idstrNoID of the function being registered.
metadatadict[str, Any] | NoneNoArbitrary metadata attached to the function.

OnFunctionRegistrationResult

Result returned from the on_function_registration_function_id hook. Omitted fields keep the original value from the registration request.
NameTypeRequiredDescription
descriptionstr | NoneNoMapped description.
function_idstr | NoneNoMapped function ID.
metadatadict[str, Any] | NoneNoMapped metadata.

OnTriggerRegistrationInput

Input passed to the on_trigger_registration_function_id hook when a worker attempts to register a trigger through the RBAC port. Return an OnTriggerRegistrationResult with the (possibly mapped) fields, or raise an exception to deny the registration.
NameTypeRequiredDescription
configAnyNoTrigger-specific configuration.
contextdict[str, Any]NoAuth context from AuthResult.context for this session.
function_idstrNoID of the function this trigger is bound to.
metadatadict[str, Any] | NoneNoArbitrary metadata attached to the trigger.
trigger_idstrNoID of the trigger being registered.
trigger_typestrNoTrigger type identifier.

OnTriggerRegistrationResult

Result returned from the on_trigger_registration_function_id hook. Omitted fields keep the original value from the registration request.
NameTypeRequiredDescription
configAnyNoMapped trigger configuration.
function_idstr | NoneNoMapped function ID.
trigger_idstr | NoneNoMapped trigger ID.
trigger_typestr | NoneNoMapped trigger type.

OnTriggerTypeRegistrationInput

Input passed to the on_trigger_type_registration_function_id hook when a worker attempts to register a new trigger type through the RBAC port. Return an OnTriggerTypeRegistrationResult with the (possibly mapped) fields, or raise an exception to deny the registration.
NameTypeRequiredDescription
contextdict[str, Any]NoAuth context from AuthResult.context for this session.
descriptionstrNoHuman-readable description of the trigger type.
trigger_type_idstrNoID of the trigger type being registered.

OnTriggerTypeRegistrationResult

Result returned from the on_trigger_type_registration_function_id hook. Omitted fields keep the original value from the registration request.
NameTypeRequiredDescription
descriptionstr | NoneNoMapped description.
trigger_type_idstr | NoneNoMapped trigger type ID.