Documentation Index
Fetch the complete documentation index at: https://iii.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Goal
Use the State worker to share data between Functions without a separate database.
Steps
1. Enable the State worker
workers:
- name: iii-state
config:
adapter:
name: kv
config:
store_method: file_based # Options: in_memory, file_based
file_path: ./data/state_store.db # required for file_based
2. Write state
Node / TypeScript
Python
Rust
import { registerWorker, Logger } from 'iii-sdk'
const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')
iii.registerFunction('users::create', async (input) => {
const logger = new Logger()
const userId = crypto.randomUUID()
const user = { id: userId, name: input.name, email: input.email }
await iii.trigger({
function_id: 'state::set',
payload: { scope: 'users', key: userId, value: user },
})
logger.info('User saved to state', { userId })
return { userId }
})
// Then call from another function or worker
const { userId } = await iii.trigger({
function_id: 'users::create',
payload: { name: 'Alice', email: 'alice@example.com' },
})
logger.info('Created user', { userId })
import os
import uuid
from iii import Logger, register_worker
iii = register_worker(os.environ.get("III_URL", "ws://localhost:49134"))
def create_user(input):
logger = Logger()
user_id = str(uuid.uuid4())
user = {"id": user_id, "name": input["name"], "email": input["email"]}
iii.trigger({
"function_id": "state::set",
"payload": {"scope": "users", "key": user_id, "value": user},
})
logger.info("User saved to state", {"userId": user_id})
return {"userId": user_id}
iii.register_function("users::create", create_user)
# Then call from another function or worker
result = iii.trigger({
"function_id": "users::create",
"payload": {"name": "Alice", "email": "alice@example.com"},
})
print("Created user:", result["userId"])
use iii_sdk::{register_worker, InitOptions, Logger, RegisterFunction, TriggerRequest};
use serde_json::{json, Value};
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = std::env::var("III_URL").unwrap_or_else(|_| "ws://127.0.0.1:49134".to_string());
let iii = register_worker(&url, InitOptions::default());
let iii_clone = iii.clone();
let reg = RegisterFunction::new_async("users::create", move |input: Value| {
let iii = iii_clone.clone();
async move {
let logger = Logger::new();
let user_id = uuid::Uuid::new_v4().to_string();
let name = input["name"].as_str().unwrap_or("");
let email = input["email"].as_str().unwrap_or("");
iii.trigger(TriggerRequest {
function_id: "state::set".into(),
payload: json!({
"scope": "users",
"key": user_id,
"value": { "id": user_id, "name": name, "email": email },
}),
action: None,
timeout_ms: None,
}).await?;
logger.info("User saved to state", Some(json!({ "userId": user_id })));
Ok(json!({ "userId": user_id }))
}
});
iii.register_function(reg);
signal::ctrl_c().await?;
Ok(())
}
// Then call from another function or worker
let result = iii
.trigger(TriggerRequest {
function_id: "users::create".into(),
payload: json!({ "name": "Alice", "email": "alice@example.com" }),
action: None,
timeout_ms: None,
})
.await?;
println!("Created user: {}", result["userId"]);
3. Read state
Node / TypeScript
Python
Rust
iii.registerFunction('users::get', async (input) => {
const logger = new Logger()
const user = await iii.trigger({
function_id: 'state::get',
payload: { scope: 'users', key: input.userId },
})
logger.info('User retrieved', { userId: input.userId })
return user
})
// Then call from another function or worker
const user = await iii.trigger({
function_id: 'users::get',
payload: { userId: 'some-user-id' },
})
logger.info('Retrieved user', user)
def get_user(input):
logger = Logger()
user = iii.trigger({
"function_id": "state::get",
"payload": {"scope": "users", "key": input["userId"]},
})
logger.info("User retrieved", {"userId": input["userId"]})
return user
iii.register_function("users::get", get_user)
# Then call from another function or worker
user = iii.trigger({
"function_id": "users::get",
"payload": {"userId": "some-user-id"},
})
print("Retrieved user:", user)
use iii_sdk::RegisterFunction;
use serde_json::Value;
let iii_clone = iii.clone();
let reg = RegisterFunction::new_async("users::get", move |input: Value| {
let iii = iii_clone.clone();
async move {
let logger = Logger::new();
let user_id = input["userId"].as_str().unwrap_or("");
let user = iii.trigger(TriggerRequest {
function_id: "state::get".into(),
payload: json!({
"scope": "users",
"key": user_id,
}),
action: None,
timeout_ms: None,
}).await?;
logger.info("User retrieved", Some(json!({ "userId": user_id })));
Ok(user)
}
});
iii.register_function(reg);
// Then call from another function or worker
let user = iii
.trigger(TriggerRequest {
function_id: "users::get".into(),
payload: json!({ "userId": "some-user-id" }),
action: None,
timeout_ms: None,
})
.await?;
println!("Retrieved user: {:?}", user);
4. Atomic updates
Use state::update with update operations for safe concurrent modifications:
Node / TypeScript
Python
Rust
iii.registerFunction('users::record-login', async (input) => {
const logger = new Logger()
await iii.trigger({
function_id: 'state::update',
payload: {
scope: 'users',
key: input.userId,
ops: [
{ type: 'set', path: 'lastLogin', value: new Date().toISOString() },
{ type: 'increment', path: 'loginCount', by: 1 },
],
},
})
logger.info('User login recorded', { userId: input.userId })
return { updated: true }
})
// Then call from another function or worker
await iii.trigger({
function_id: 'users::record-login',
payload: { userId: 'some-user-id' },
})
logger.info('Login recorded')
from datetime import datetime, timezone
def record_login(input):
logger = Logger()
iii.trigger({
"function_id": "state::update",
"payload": {
"scope": "users",
"key": input["userId"],
"ops": [
{"type": "set", "path": "lastLogin", "value": datetime.now(timezone.utc).isoformat()},
{"type": "increment", "path": "loginCount", "by": 1},
],
},
})
logger.info("User login recorded", {"userId": input["userId"]})
return {"updated": True}
iii.register_function("users::record-login", record_login)
# Then call from another function or worker
iii.trigger({
"function_id": "users::record-login",
"payload": {"userId": "some-user-id"},
})
print("Login recorded")
use iii_sdk::RegisterFunction;
use serde_json::Value;
let iii_clone = iii.clone();
let reg = RegisterFunction::new_async("users::record-login", move |input: Value| {
let iii = iii_clone.clone();
async move {
let logger = Logger::new();
let user_id = input["userId"].as_str().unwrap_or("");
iii.trigger(TriggerRequest {
function_id: "state::update".into(),
payload: json!({
"scope": "users",
"key": user_id,
"ops": [
{ "type": "set", "path": "lastLogin", "value": chrono::Utc::now().to_rfc3339() },
{ "type": "increment", "path": "loginCount", "by": 1 },
],
}),
action: None,
timeout_ms: None,
}).await?;
logger.info("User login recorded", Some(json!({ "userId": user_id })));
Ok(json!({ "updated": true }))
}
});
iii.register_function(reg);
// Then call from another function or worker
let _ = iii
.trigger(TriggerRequest {
function_id: "users::record-login".into(),
payload: json!({ "userId": "some-user-id" }),
action: None,
timeout_ms: None,
})
.await?;
println!("Login recorded");
Append operations are useful for incremental data such as transcript chunks, progress events, or small audit trails:
await iii.trigger({
function_id: 'state::update',
payload: {
scope: 'calls',
key: callId,
ops: [
{ type: 'append', path: 'transcript', value: 'Hello ' },
{ type: 'append', path: 'events', value: { type: 'chunk', offset: 0 } },
],
},
})
If transcript is missing, a string append creates a string. If events is missing, a non-string append creates an array with one element. Existing arrays receive one new element per append. Existing strings only accept string values.
Paths for set / append / increment / decrement / remove are first-level field names. user.name means the field literally named user.name; it does not traverse nested objects. Use path: "" to target the root value.
Atomic append rewrites and returns the full JSON value. For very large payloads or long-running high-throughput streams, store chunks as separate stream items or use channels instead of repeatedly appending to one large value.
Build per-session structured state with nested merge
merge is the one update op that accepts a nested path. Its path is either a single string (legacy / first-level field) or an array of literal segments. This makes it the right tool for accumulating per-session structured state — a transcript timeline keyed by session, an event log per user, the metadata sidebar of an in-flight call.
The example below builds audio::transcripts[<session-id>] = { "<timestamp>": chunk } incrementally. Two writers can append to different timestamps in the same session without stepping on each other; sibling timestamps are preserved by every merge.
Node / TypeScript
Python
Rust
record-transcript-chunk.ts
await iii.trigger({
function_id: 'state::update',
payload: {
scope: 'audio::transcripts',
key: 'session-abc',
ops: [
// Nested path: walks into "session-abc" → "metadata", auto-
// creating each intermediate object. Sibling keys at any level
// are preserved.
{
type: 'merge',
path: ['session-abc', 'metadata'],
value: { author: 'alice' },
},
// First-level form (sugar for path: ['session-abc']).
{
type: 'merge',
path: 'session-abc',
value: { [Date.now()]: 'transcript chunk' },
},
],
},
})
record_transcript_chunk.py
import time
iii.trigger({
"function_id": "state::update",
"payload": {
"scope": "audio::transcripts",
"key": "session-abc",
"ops": [
{
"type": "merge",
"path": ["session-abc", "metadata"],
"value": {"author": "alice"},
},
{
"type": "merge",
"path": "session-abc",
"value": {str(int(time.time() * 1000)): "transcript chunk"},
},
],
},
})
record_transcript_chunk.rs
use iii_sdk::{TriggerRequest, UpdateOp};
use serde_json::{json, Value};
iii.trigger(TriggerRequest {
function_id: "state::update".into(),
payload: json!({
"scope": "audio::transcripts",
"key": "session-abc",
"ops": [
UpdateOp::merge_at_path(
["session-abc", "metadata"],
json!({ "author": "alice" }),
),
UpdateOp::merge_at(
"session-abc",
json!({ format!("{}", chrono::Utc::now().timestamp_millis()): "transcript chunk" }),
),
],
}),
action: None,
timeout_ms: None,
}).await?;
Each segment in a merge path array is a literal key. ["a.b"] writes a single key named "a.b", not a → b. To deep-merge sub-objects, compose multiple ops with deeper paths.
If the engine rejects a merge op (path > 32 segments, segment > 256 bytes, value > 16 levels deep, > 1024 top-level keys, or a __proto__ / constructor / prototype segment or top-level key), it returns an entry in the response’s errors array with a stable code, message, and doc_url. Successfully applied ops still reflect in new_value.
Result
State is shared across all Functions in the system. Any Function can read or write to any scope/key pair. The Engine handles consistency and persistence based on the configured adapter.
The default file_based adapter works for single-instance use. For production with multiple replicas, use a persistent adapter like Redis. See the State worker reference.