Hello World
An HTTP endpoint that publishes an event and a background handler that processes it.
This example shows the two-step pattern at the heart of most iii workflows: an HTTP handler accepts a request and publishes an event to the queue, then a queue handler processes that event in the background and persists the result to state.
Worker setup
Every iii worker starts by initialising the SDK and connecting to the engine.
// worker.ts
import { registerWorker, Logger } from 'iii-sdk'
const iii = await registerWorker(process.env.III_URL ?? 'ws://localhost:49134')# worker.py
from iii import register_worker, InitOptions, ApiRequest, ApiResponse
iii = register_worker(
address="ws://localhost:49134",
options=InitOptions(worker_name="hello-worker"),
)// main.rs
use iii_sdk::{register_worker, InitOptions, Logger};
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let iii = register_worker("ws://127.0.0.1:49134", InitOptions::default());
// ... register functions
loop { tokio::time::sleep(std::time::Duration::from_secs(60)).await; }
}Step 1 — HTTP handler
Registers a function and binds it to an HTTP trigger. Returns immediately after publishing to the queue.
await iii.registerFunction(
{ id: 'hello::api', description: 'Receives hello request' },
async (req: ApiRequest) => {
const logger = new Logger()
const appName = 'III App'
const requestId = Math.random().toString(36).substring(7)
logger.info('Hello API called', { appName, requestId })
await iii.trigger({
function_id: 'greet::process',
payload: {
requestId,
appName,
greetingPrefix: process.env.GREETING_PREFIX ?? 'Hello',
timestamp: new Date().toISOString(),
},
action: TriggerAction.Enqueue({ queue: 'default' }),
})
return {
status_code: 200,
body: {
message: 'Hello request received! Processing in background.',
status: 'processing',
appName,
},
} satisfies ApiResponse
},
)
await iii.registerTrigger({
type: 'http',
function_id: 'hello::api',
config: { api_path: 'hello', http_method: 'GET' },
})import os
import random
import string
from datetime import datetime, timezone
from iii import Logger
def hello_api(data) -> ApiResponse:
logger = Logger()
req = ApiRequest(**data) if isinstance(data, dict) else data
app_name = "III App"
request_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=7))
logger.info("Hello API called", {"appName": app_name, "requestId": request_id})
iii.trigger({
"function_id": "enqueue",
"payload": {
"topic": "greet::process",
"data": {
"requestId": request_id,
"appName": app_name,
"greetingPrefix": os.environ.get("GREETING_PREFIX", "Hello"),
"timestamp": datetime.now(timezone.utc).isoformat(),
},
},
})
return ApiResponse(
status_code=200,
body={
"message": "Hello request received! Processing in background.",
"status": "processing",
"appName": app_name,
},
)
iii.register_function({"id": "hello::api"}, hello_api)
iii.register_trigger({
"type": "http",
"function_id": "hello::api",
"config": {"api_path": "hello", "http_method": "GET"},
})use iii_sdk::{Logger, TriggerAction, TriggerRequest, types::ApiRequest, RegisterFunctionMessage, RegisterTriggerInput};
iii.register_function(RegisterFunctionMessage { id: "hello::api".into(), ..Default::default() }, |input| async move {
let logger = Logger();
let app_name = "III App";
let request_id = uuid::Uuid::new_v4().to_string();
logger.info("Hello API called", Some(json!({
"appName": app_name,
"requestId": request_id,
})));
iii.trigger(
TriggerRequest::new("greet::process", json!({
"requestId": request_id,
"appName": app_name,
"greetingPrefix": std::env::var("GREETING_PREFIX").unwrap_or_else(|_| "Hello".to_string()),
"timestamp": chrono::Utc::now().to_rfc3339(),
}))
.action(TriggerAction::enqueue("default")),
)
.await?;
Ok(json!({
"status_code": 200,
"body": {
"message": "Hello request received! Processing in background.",
"status": "processing",
"appName": app_name,
},
}))
});
iii.register_trigger(RegisterTriggerInput { type_: "http".into(), function_id: "hello::api".into(), config: json!({
"api_path": "hello",
"http_method": "GET",
}), ..Default::default() })?;Step 2 — Queue handler
Consumes the event, builds the greeting, and persists it to state.
await iii.registerFunction(
{ id: 'greet::process', description: 'Processes greeting in background' },
async (data) => {
const logger = new Logger()
const { requestId, appName, greetingPrefix, timestamp } = data as {
requestId: string
appName: string
greetingPrefix: string
timestamp: string
}
logger.info('Processing greeting', { requestId, appName })
const greeting = `${greetingPrefix} ${appName}!`
await iii.trigger({
function_id: 'state::set',
payload: {
scope: 'greetings',
key: requestId,
value: {
greeting,
processedAt: new Date().toISOString(),
originalTimestamp: timestamp,
},
},
})
logger.info('Greeting processed', { requestId, greeting })
},
)from datetime import datetime, timezone
from iii import Logger
def greet_process(data: dict) -> None:
logger = Logger()
request_id = data.get("requestId", "unknown")
app_name = data.get("appName", "III App")
greeting_prefix = data.get("greetingPrefix", "Hello")
timestamp = data.get("timestamp", "")
logger.info("Processing greeting", {"requestId": request_id, "appName": app_name})
greeting = f"{greeting_prefix} {app_name}!"
iii.trigger({
"function_id": "state::set",
"payload": {
"scope": "greetings",
"key": request_id,
"data": {
"greeting": greeting,
"processedAt": datetime.now(timezone.utc).isoformat(),
"originalTimestamp": timestamp,
},
},
})
logger.info("Greeting processed", {"requestId": request_id, "greeting": greeting})
iii.register_function({"id": "greet::process"}, greet_process)
iii.register_trigger({
"type": "queue",
"function_id": "greet::process",
"config": {"topic": "greet::process"},
})iii.register_function(RegisterFunctionMessage { id: "greet::process".into(), ..Default::default() }, |input| async move {
let logger = Logger();
let request_id = input["requestId"].as_str().unwrap_or("unknown");
let app_name = input["appName"].as_str().unwrap_or("III App");
let prefix = input["greetingPrefix"].as_str().unwrap_or("Hello");
let timestamp = input["timestamp"].as_str().unwrap_or("");
logger.info("Processing greeting", Some(json!({
"requestId": request_id,
"appName": app_name,
})));
let greeting = format!("{} {}!", prefix, app_name);
iii.trigger(
TriggerRequest::new("state::set", json!({
"scope": "greetings",
"key": request_id,
"value": {
"greeting": greeting,
"processedAt": chrono::Utc::now().to_rfc3339(),
"originalTimestamp": timestamp,
},
}))
.action(TriggerAction::void()),
)
.await?;
logger.info("Greeting processed", Some(json!({ "requestId": request_id })));
Ok(json!(null))
});
iii.register_trigger(RegisterTriggerInput { type_: "queue".into(), function_id: "greet::process".into(), config: json!({
"topic": "greet::process",
}), ..Default::default() })?;Connect and run
Node.js connects automatically
The Node SDK establishes the WebSocket connection when you call registerWorker(). There is no separate connect() method. Keep the process alive so the worker stays registered.
// Node SDK connects on registerWorker() — keep the process alive
await new Promise(() => {})import time
def main() -> None:
while True:
time.sleep(60)// Already in the main() above — register_worker auto-connects
loop {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}Test it
curl http://localhost:3111/hello
# {"message":"Hello request received! Processing in background.","status":"processing","appName":"III App"}Key concepts
iii.registerFunctionpairs a string ID with an async handler. The ID is referenced by all triggers bound to that function.iii.registerTriggerbinds a trigger type + config to a function ID. A function can have multiple triggers.iii.trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) })enqueues work to a named queue. The target function receives the payload as its input.iii.trigger({ function_id: 'state::set', payload: { scope, key, value }, action: TriggerAction.Void() })persists data to the engine's key-value store.