iii

Observability

Structured logging, trace ID propagation, and automatic OpenTelemetry across a multi-step workflow.

Every iii function invocation receives a traceId (via new Logger()) that is automatically propagated to downstream queue handlers. All logs emitted through the context logger are structured JSON and correlated to the active trace.

Multi-step workflow with trace correlation

import {registerWorker, Logger, TriggerAction} from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134', {
  otel: {
    enabled: true,
    serviceName: 'orders-service',
    metricsEnabled: true,
  },
})

// Step 1 — HTTP handler
iii.registerFunction(
  { id: 'orders.create', description: 'Creates an order and starts the processing pipeline' },
  async (req: ApiRequest<{ customerId: string; amount: number; items: string[] }>) => {
    const logger = new Logger()
    const { customerId, amount, items } = req.body ?? {}

    if (!customerId || !amount) {
      return { status_code: 400, body: { error: 'customerId and amount are required' } }
    }

    const orderId = `order-${Date.now()}`

    logger.info('Order created', {
      orderId,
      customerId,
      amount,
      traceId: currentTraceId(), // same traceId will appear in downstream logs
    })

    await iii.trigger({
      function_id: 'state::set',
      payload: {
        scope: 'orders',
        key: orderId,
        value: { id: orderId, customerId, amount, items, status: 'created', createdAt: new Date().toISOString() },
      },
      action: TriggerAction.Void(),
    })

    await iii.trigger({
      function_id: 'enqueue',
      payload: { topic: 'order.process', data: { orderId, amount, customerId, items } },
      action: TriggerAction.Void(),
    })

    return { status_code: 201, body: { orderId, status: 'processing' } }
  },
)

iii.registerTrigger({
  type: 'http',
  function_id: 'orders.create',
  config: { api_path: 'orders', http_method: 'POST' },
})
from datetime import datetime, timezone
from iii import register_worker, InitOptions, ApiRequest, ApiResponse, Logger, TriggerAction, current_trace_id

iii = register_worker(address="ws://localhost:49134", options=InitOptions(worker_name="orders-service"))

def create_order(data) -> ApiResponse:
    logger = Logger()
    req = ApiRequest(**data) if isinstance(data, dict) else data
    body = req.body or {}
    customer_id = body.get("customerId")
    amount = body.get("amount")

    if not customer_id or not amount:
        return ApiResponse(status_code=400, body={"error": "customerId and amount are required"})

    order_id = f"order-{int(__import__('time').time() * 1000)}"

    logger.info("Order created", {
        "orderId": order_id,
        "customerId": customer_id,
        "amount": amount,
        "traceId": current_trace_id(),  # same traceId will appear in downstream logs
    })

    iii.trigger({
        "function_id": "state::set",
        "payload": {
            "scope": "orders",
            "key": order_id,
            "value": {
                "id": order_id,
                "customerId": customer_id,
                "amount": amount,
                "status": "created",
                "createdAt": datetime.now(timezone.utc).isoformat(),
            },
        },
        "action": TriggerAction.Void(),
    })

    iii.trigger({
        "function_id": "enqueue",
        "payload": {"topic": "order.process", "data": {"orderId": order_id, "amount": amount, "customerId": customer_id}},
        "action": TriggerAction.Void(),
    })

    return ApiResponse(status_code=201, body={"orderId": order_id, "status": "processing"})

iii.register_function({"id": "orders.create"}, create_order)
iii.register_trigger({
    "type": "http", "function_id": "orders.create",
    "config": {"api_path": "orders", "http_method": "POST"},
})
use iii_sdk::{register_worker, InitOptions, Logger, TriggerRequest, TriggerAction, types::ApiRequest, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;

let iii = register_worker("ws://localhost:49134", InitOptions::default());

iii.register_function(RegisterFunctionMessage { id: "orders.create".into(), ..Default::default() }, |input| async move {
    let logger = Logger();
    let req: ApiRequest = serde_json::from_value(input)?;

    let customer_id = req.body["customerId"].as_str().unwrap_or("").to_string();
    let amount = req.body["amount"].as_f64().unwrap_or(0.0);

    if customer_id.is_empty() || amount == 0.0 {
        return Ok(json!({
            "status_code": 400,
            "body": { "error": "customerId and amount are required" },
        }));
    }

    let order_id = format!("order-{}", chrono::Utc::now().timestamp_millis());

    logger.info("Order created", Some(json!({
        "orderId": order_id,
        "customerId": customer_id,
        "amount": amount,
        "traceId": current_trace_id(), // same traceId propagated to downstream handlers
    })));

    iii.trigger(TriggerRequest::new("state::set", json!({
        "scope": "orders",
        "key": order_id,
        "value": {
            "id": order_id,
            "customerId": customer_id,
            "amount": amount,
            "status": "created",
            "createdAt": chrono::Utc::now().to_rfc3339(),
        },
    })).action(TriggerAction::void())).await?;

    iii.trigger(TriggerRequest::new("enqueue", json!({
        "topic": "order.process",
        "data": { "orderId": order_id, "amount": amount, "customerId": customer_id },
    })).action(TriggerAction::void())).await?;

    Ok(json!({ "status_code": 201, "body": { "orderId": order_id, "status": "processing" } }))
});

iii.register_trigger(RegisterTriggerInput { type_: "http".into(), function_id: "orders.create".into(), config: json!({
    "api_path": "orders",
    "http_method": "POST",
}), ..Default::default() })?;

Step 2 — Queue processor

iii.registerFunction(
  { id: 'order.process', description: 'Processes a created order' },
  async (data: { orderId: string; amount: number; customerId: string }) => {
    const logger = new Logger()
    const { orderId, amount, customerId } = data

    logger.info('Processing order', { orderId, amount, customerId, traceId: currentTraceId() })

    await iii.trigger({
      function_id: 'state::set',
      payload: {
        scope: 'orders',
        key: orderId,
        value: { status: 'processed', processedAt: new Date().toISOString() },
      },
      action: TriggerAction.Void(),
    })

    await iii.trigger({
      function_id: 'enqueue',
      payload: { topic: 'order.notify', data: { orderId, customerId, amount, status: 'processed' } },
      action: TriggerAction.Void(),
    })

    logger.info('Order processed', { orderId, traceId: currentTraceId() })
  },
)

iii.registerTrigger({
  type: 'queue',
  function_id: 'order.process',
  config: { topic: 'order.process' },
})
from iii import Logger, current_trace_id

def process_order(data: dict) -> None:
    logger = Logger()
    order_id = data.get("orderId")
    amount = data.get("amount")
    customer_id = data.get("customerId")

    logger.info("Processing order", {
        "orderId": order_id,
        "amount": amount,
        "traceId": current_trace_id(),
    })

    iii.trigger({
        "function_id": "state::set",
        "payload": {
            "scope": "orders",
            "key": order_id,
            "value": {
                "status": "processed",
                "processedAt": datetime.now(timezone.utc).isoformat(),
            },
        },
        "action": TriggerAction.Void(),
    })

    iii.trigger({
        "function_id": "enqueue",
        "payload": {"topic": "order.notify", "data": {"orderId": order_id, "customerId": customer_id, "amount": amount, "status": "processed"}},
        "action": TriggerAction.Void(),
    })

    logger.info("Order processed", {"orderId": order_id, "traceId": current_trace_id()})

iii.register_function({"id": "order.process"}, process_order)
iii.register_trigger({"type": "queue", "function_id": "order.process", "config": {"topic": "order.process"}})
use iii_sdk::{Logger, TriggerRequest, TriggerAction, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;

iii.register_function(RegisterFunctionMessage { id: "order.process".into(), ..Default::default() }, |input| async move {
    let logger = Logger();
    let order_id = input["orderId"].as_str().unwrap_or("unknown");

    logger.info("Processing order", Some(json!({
        "orderId": order_id,
        "traceId": current_trace_id(),
    })));

    iii.trigger(TriggerRequest::new("enqueue", json!({
        "topic": "order.notify",
        "data": { "orderId": order_id, "customerId": input["customerId"] },
    })).action(TriggerAction::void())).await?;

    logger.info("Order processed", Some(json!({ "orderId": order_id })));
    Ok(json!(null))
});

iii.register_trigger(RegisterTriggerInput { type_: "queue".into(), function_id: "order.process".into(), config: json!({ "topic": "order.process" }), ..Default::default() })?;

Step 3 — Notification

iii.registerFunction(
  { id: 'order.notify', description: 'Sends order notification' },
  async (data: { orderId: string; customerId: string; status: string; amount: number }) => {
    const logger = new Logger()
    const { orderId, customerId, status, amount } = data

    logger.info('Sending order notification', {
      orderId,
      customerId,
      status,
      amount,
      traceId: currentTraceId(),
    })

    // integrate with your notification service here
    logger.info('Order notification sent', { orderId, traceId: currentTraceId() })
  },
)

iii.registerTrigger({
  type: 'queue',
  function_id: 'order.notify',
  config: { topic: 'order.notify' },
})
from iii import Logger, current_trace_id

def notify_order(data: dict) -> None:
    logger = Logger()
    order_id = data.get("orderId")
    customer_id = data.get("customerId")

    logger.info("Sending order notification", {
        "orderId": order_id,
        "customerId": customer_id,
        "traceId": current_trace_id(),
    })

    # integrate with your notification service here
    logger.info("Order notification sent", {"orderId": order_id, "traceId": current_trace_id()})

iii.register_function({"id": "order.notify"}, notify_order)
iii.register_trigger({"type": "queue", "function_id": "order.notify", "config": {"topic": "order.notify"}})
use iii_sdk::{Logger, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;

iii.register_function(RegisterFunctionMessage { id: "order.notify".into(), ..Default::default() }, |input| async move {
    let logger = Logger();
    let order_id = input["orderId"].as_str().unwrap_or("unknown");

    logger.info("Sending order notification", Some(json!({
        "orderId": order_id,
        "traceId": current_trace_id(),
    })));

    // integrate with your notification service here
    logger.info("Order notification sent", Some(json!({ "orderId": order_id })));
    Ok(json!(null))
});

iii.register_trigger(RegisterTriggerInput { type_: "queue".into(), function_id: "order.notify".into(), config: json!({ "topic": "order.notify" }), ..Default::default() })?;

OpenTelemetry setup

// Pass otel config to registerWorker()
const iii = registerWorker('ws://localhost:49134', {
  otel: {
    enabled: true,
    serviceName: 'my-service',
    serviceVersion: '1.0.0',
    metricsEnabled: true,
    metricsExportIntervalMs: 10000,
    reconnectionConfig: {
      maxRetries: 10,
    },
  },
})

The iii SDK exports traces and metrics automatically via the engine's OpenTelemetry pipeline (OTLP over the WebSocket). No separate exporter configuration is required.

# OTel is handled by the engine — just pass the worker name in InitOptions
from iii import register_worker, InitOptions, Logger, current_trace_id

iii = register_worker(address="ws://localhost:49134", options=InitOptions(worker_name="my-service"))

# Traces are automatically correlated via current_trace_id()
# Access trace_id in any handler:
logger = Logger()
print(current_trace_id())
# Cargo.toml — enable the telemetry feature
[dependencies]
iii-sdk = { version = "0.1", features = ["telemetry"] }
use iii_sdk::{register_worker, InitOptions, Logger};
use serde_json::json;

// Initialize OTLP before connecting
iii_sdk::telemetry::init_otlp("my-service").await?;

let iii = register_worker("ws://127.0.0.1:49134", InitOptions::default());

// current_trace_id() is available in every handler
let logger = Logger();
logger.info("My message", Some(json!({ "traceId": current_trace_id() })));

Logger methods

MethodTypeScriptPythonRust
Infologger.info(msg, metadata?)logger.info(msg, fields?)logger.info(msg, Some(json!({...})))
Warninglogger.warn(msg, metadata?)logger.warn(msg, fields?)logger.warn(msg, None)
Errorlogger.error(msg, metadata?)logger.error(msg, fields?)logger.error(msg, None)
Debuglogger.debug(msg, metadata?)logger.debug(msg, fields?)logger.debug(msg, None)

Key concepts

  • currentTraceId() / current_trace_id() is the W3C trace-context ID for the current invocation. It is automatically propagated to all downstream workers when you emit an event — no manual header passing is needed.
  • All logs are emitted as structured JSON via the engine's log.info / log.warn / log.error functions.
  • The Node.js SDK supports OTel configuration directly in registerWorker(). Python and Rust use the engine's built-in OTLP pipeline.
  • Log every step at entry and exit with the traceId to make multi-step flows fully observable.

On this page