Todo App
Full CRUD backed by a custom stream implementation, showing iii.createStream and stream::get/set/delete.
This example builds a complete Todo API using the iii SDK directly. It covers custom stream registration (iii.createStream), reading and writing via stream::get / stream::set / stream::delete, and the helper stream::list for group-level queries.
Custom stream
The iii SDK lets you register a custom stream backed by any storage — in-memory, a database, or a remote service. The engine routes stream::get / stream::set / stream::delete / stream::list requests to your implementation.
// stream.ts
import { registerWorker } from 'iii-sdk'
export type Todo = {
id: string
description: string
groupId: string
createdAt: string
dueDate?: string
completedAt: string | null
}
const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')
let todos: Todo[] = []
iii.createStream('todo', {
get: async (input) => todos.find((t) => t.id === input.item_id) ?? null,
set: async (input) => {
const existing = todos.find((t) => t.id === input.item_id)
if (existing) {
const updated = { ...existing, ...input.data }
todos = todos.map((t) => (t.id === input.item_id ? updated : t))
return { old_value: existing, new_value: updated }
}
const newTodo: Todo = {
id: input.item_id,
groupId: input.group_id,
description: input.data.description,
createdAt: new Date().toISOString(),
dueDate: input.data.dueDate,
completedAt: null,
}
todos.push(newTodo)
return { old_value: undefined, new_value: newTodo }
},
delete: async (input) => {
const old_value = todos.find((t) => t.id === input.item_id)
todos = todos.filter((t) => t.id !== input.item_id)
return { old_value }
},
list: async (input) => todos.filter((t) => t.groupId === input.group_id),
listGroups: async () => [...new Set(todos.map((t) => t.groupId))],
update: async () => { throw new Error('Not implemented') },
})
export { iii }# stream.py
from __future__ import annotations
from typing import Any
from iii import IStream
from iii.stream import (
StreamDeleteInput, StreamDeleteResult, StreamGetInput, StreamListGroupsInput,
StreamListInput, StreamSetInput, StreamSetResult, StreamUpdateInput,
StreamUpdateResult,
)
from .iii import iii
class TodoStream(IStream[dict[str, Any]]):
def __init__(self) -> None:
self._todos: list[dict[str, Any]] = []
async def get(self, input: StreamGetInput) -> dict[str, Any] | None:
return next((t for t in self._todos if t["id"] == input.item_id), None)
async def set(self, input: StreamSetInput) -> StreamSetResult[dict[str, Any]] | None:
for i, todo in enumerate(self._todos):
if todo["id"] == input.item_id:
updated = {**todo, **input.data}
self._todos[i] = updated
return StreamSetResult(old_value=todo, new_value=updated)
new_todo = {
"id": input.item_id,
"groupId": input.group_id,
"description": input.data.get("description", ""),
"createdAt": input.data.get("createdAt"),
"dueDate": input.data.get("dueDate"),
"completedAt": None,
}
self._todos.append(new_todo)
return StreamSetResult(old_value=None, new_value=new_todo)
async def delete(self, input: StreamDeleteInput) -> StreamDeleteResult:
old_value = next((t for t in self._todos if t["id"] == input.item_id), None)
self._todos = [t for t in self._todos if t["id"] != input.item_id]
return StreamDeleteResult(old_value=old_value)
async def list(self, input: StreamListInput) -> list[dict[str, Any]]:
return [t for t in self._todos if t.get("groupId") == input.group_id]
async def list_groups(self, input: StreamListGroupsInput) -> list[str]:
return list({t["groupId"] for t in self._todos if "groupId" in t})
async def update(self, input: StreamUpdateInput) -> StreamUpdateResult[dict[str, Any]] | None:
return None
iii.create_stream("todo", TodoStream())// In Rust, use the Streams helper for atomic stream operations
use iii_sdk::{register_worker, InitOptions, Streams, UpdateOp};
let iii = register_worker("ws://localhost:49134", InitOptions::default());
let streams = Streams::new(iii.clone());
// Stream items are addressed as "stream_name::group_id::item_id"Create
import {Logger, TriggerAction} from 'iii-sdk'
import { iii } from './stream'
iii.registerFunction(
{ id: 'api.post.todo', description: 'Create a new todo' },
async (req: ApiRequest<{ description: string; dueDate?: string }>) => {
const logger = new Logger()
const { description, dueDate } = req.body ?? {}
if (!description) {
return { status_code: 400, body: { error: 'Description is required' } } satisfies ApiResponse
}
const todoId = `todo-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`
logger.info('Creating todo', { todoId })
const todo = await iii.trigger({
function_id: 'stream::set',
payload: {
stream_name: 'todo',
group_id: 'inbox',
item_id: todoId,
data: {
id: todoId,
description,
groupId: 'inbox',
createdAt: new Date().toISOString(),
dueDate,
completedAt: null,
},
},
})
logger.info('Todo created', { todoId })
return { status_code: 201, body: todo } satisfies ApiResponse
},
)
iii.registerTrigger({
type: 'http',
function_id: 'api.post.todo',
config: { api_path: 'todo', http_method: 'POST', description: 'Create a new todo' },
})import random, string, time
from datetime import datetime, timezone
from iii import ApiRequest, ApiResponse, Logger, TriggerAction
from .iii import iii
def _create_todo(data) -> ApiResponse:
logger = Logger()
req = ApiRequest(**data) if isinstance(data, dict) else data
description = req.body.get("description") if req.body else None
due_date = req.body.get("dueDate") if req.body else None
if not description:
return ApiResponse(status_code=400, body={"error": "Description is required"})
suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=7))
todo_id = f"todo-{int(time.time() * 1000)}-{suffix}"
logger.info("Creating todo", {"todoId": todo_id})
todo = iii.trigger({
"function_id": "stream::set",
"payload": {
"stream_name": "todo",
"group_id": "inbox",
"item_id": todo_id,
"data": {
"id": todo_id,
"description": description,
"createdAt": datetime.now(timezone.utc).isoformat(),
"dueDate": due_date,
"completedAt": None,
},
},
})
logger.info("Todo created", {"todoId": todo_id})
return ApiResponse(status_code=201, body=todo, headers={"Content-Type": "application/json"})
iii.register_function({"id": "api.post.todo"}, _create_todo)
iii.register_trigger({
"type": "http", "function_id": "api.post.todo",
"config": {"api_path": "todo", "http_method": "POST", "description": "Create a new todo"},
})use iii_sdk::{
Logger, RegisterFunctionMessage, RegisterTriggerInput,
Streams, UpdateOp, ApiRequest,
};
use serde_json::json;
iii.register_function(
RegisterFunctionMessage {
id: "api.post.todo".into(),
description: Some("Create a new todo".into()),
request_format: None, response_format: None,
metadata: None, invocation: None,
},
|input| async move {
let logger = Logger::new();
let req: ApiRequest = serde_json::from_value(input)?;
let description = req.body["description"].as_str().unwrap_or("").to_string();
if description.is_empty() {
return Ok(json!({
"status_code": 400,
"body": { "error": "Description is required" },
}));
}
let todo_id = format!("todo-{}", chrono::Utc::now().timestamp_millis());
logger.info("Creating todo", Some(json!({ "todoId": todo_id })));
let result = streams.update(
&format!("todo::inbox::{}", todo_id),
vec![
UpdateOp::set("id", json!(todo_id.clone())),
UpdateOp::set("description", json!(description)),
UpdateOp::set("createdAt", json!(chrono::Utc::now().to_rfc3339())),
UpdateOp::set("completedAt", json!(null)),
],
).await?;
logger.info("Todo created", Some(json!({ "todoId": todo_id })));
Ok(json!({ "status_code": 201, "body": result.new_value }))
},
);
iii.register_trigger(RegisterTriggerInput {
trigger_type: "http".into(),
function_id: "api.post.todo".into(),
config: json!({ "api_path": "todo", "http_method": "POST" }),
})?;Update
iii.registerFunction(
{ id: 'api.put.todo', description: 'Update a todo' },
async (req: ApiRequest) => {
const logger = new Logger()
const todoId = req.path_params?.id
const existing = todoId
? await iii.trigger<Todo | null>({
function_id: 'stream::get',
payload: {
stream_name: 'todo',
group_id: 'inbox',
item_id: todoId,
},
})
: null
if (!existing) {
return { status_code: 404, body: { error: 'Todo not found' } } satisfies ApiResponse
}
logger.info('Updating todo', { todoId })
const updated = await iii.trigger({
function_id: 'stream::set',
payload: {
stream_name: 'todo',
group_id: 'inbox',
item_id: todoId,
data: { ...existing, ...req.body },
},
})
logger.info('Todo updated', { todoId })
return { status_code: 200, body: updated } satisfies ApiResponse
},
)
iii.registerTrigger({
type: 'http',
function_id: 'api.put.todo',
config: { api_path: 'todo/:id', http_method: 'PUT', description: 'Update a todo' },
})def _update_todo(data) -> ApiResponse:
logger = Logger()
req = ApiRequest(**data) if isinstance(data, dict) else data
todo_id = req.path_params.get("id") if req.path_params else None
existing = iii.trigger({
"function_id": "stream::get",
"payload": {"stream_name": "todo", "group_id": "inbox", "item_id": todo_id},
}) if todo_id else None
if not existing:
return ApiResponse(status_code=404, body={"error": "Todo not found"})
logger.info("Updating todo", {"todoId": todo_id})
merged = {**existing, **(req.body or {})}
updated = iii.trigger({
"function_id": "stream::set",
"payload": {"stream_name": "todo", "group_id": "inbox", "item_id": todo_id, "data": merged},
})
logger.info("Todo updated", {"todoId": todo_id})
return ApiResponse(status_code=200, body=updated, headers={"Content-Type": "application/json"})
iii.register_function({"id": "api.put.todo"}, _update_todo)
iii.register_trigger({
"type": "http", "function_id": "api.put.todo",
"config": {"api_path": "todo/:id", "http_method": "PUT", "description": "Update a todo"},
})iii.register_function(
RegisterFunctionMessage {
id: "api.put.todo".into(),
description: Some("Update a todo".into()),
request_format: None, response_format: None,
metadata: None, invocation: None,
},
|input| async move {
let logger = Logger::new();
let req: ApiRequest = serde_json::from_value(input)?;
let todo_id = req.path_params.get("id").cloned().unwrap_or_default();
let mut ops = vec![];
if let Some(desc) = req.body.get("description") {
ops.push(UpdateOp::set("description", desc.clone()));
}
if let Some(checked) = req.body["checked"].as_bool() {
let completed_at = if checked { json!(chrono::Utc::now().to_rfc3339()) } else { json!(null) };
ops.push(UpdateOp::set("completedAt", completed_at));
}
if ops.is_empty() {
return Ok(json!({ "status_code": 400, "body": { "error": "No fields to update" } }));
}
logger.info("Updating todo", Some(json!({ "todoId": todo_id })));
let result = streams.update(&format!("todo::inbox::{}", todo_id), ops).await?;
logger.info("Todo updated", Some(json!({ "todoId": todo_id })));
Ok(json!({ "status_code": 200, "body": result.new_value }))
},
);
iii.register_trigger(RegisterTriggerInput {
trigger_type: "http".into(),
function_id: "api.put.todo".into(),
config: json!({ "api_path": "todo/:id", "http_method": "PUT" }),
})?;Delete
iii.registerFunction(
{ id: 'api.delete.todo', description: 'Delete a todo' },
async (req: ApiRequest<{ todoId: string }>) => {
const logger = new Logger()
const { todoId } = req.body ?? {}
if (!todoId) {
return { status_code: 400, body: { error: 'todoId is required' } } satisfies ApiResponse
}
logger.info('Deleting todo', { todoId })
iii.trigger({
function_id: 'stream::delete',
payload: {
stream_name: 'todo',
group_id: 'inbox',
item_id: todoId,
},
action: TriggerAction.Void(),
})
logger.info('Todo deleted', { todoId })
return { status_code: 200, body: { success: true } } satisfies ApiResponse
},
)
iii.registerTrigger({
type: 'http',
function_id: 'api.delete.todo',
config: { api_path: 'todo', http_method: 'DELETE', description: 'Delete a todo' },
})def _delete_todo(data) -> ApiResponse:
logger = Logger()
req = ApiRequest(**data) if isinstance(data, dict) else data
todo_id = req.body.get("todoId") if req.body else None
if not todo_id:
return ApiResponse(status_code=400, body={"error": "todoId is required"})
logger.info("Deleting todo", {"todoId": todo_id})
iii.trigger({
"function_id": "stream::delete",
"payload": {"stream_name": "todo", "group_id": "inbox", "item_id": todo_id},
"action": TriggerAction.Void(),
})
logger.info("Todo deleted", {"todoId": todo_id})
return ApiResponse(status_code=200, body={"success": True}, headers={"Content-Type": "application/json"})
iii.register_function({"id": "api.delete.todo"}, _delete_todo)
iii.register_trigger({
"type": "http", "function_id": "api.delete.todo",
"config": {"api_path": "todo", "http_method": "DELETE", "description": "Delete a todo"},
})use iii_sdk::{TriggerRequest, TriggerAction};
iii.register_function(
RegisterFunctionMessage {
id: "api.delete.todo".into(),
description: Some("Delete a todo".into()),
request_format: None, response_format: None,
metadata: None, invocation: None,
},
|input| async move {
let logger = Logger::new();
let req: ApiRequest = serde_json::from_value(input)?;
let todo_id = req.body["todoId"].as_str().unwrap_or("").to_string();
if todo_id.is_empty() {
return Ok(json!({ "status_code": 400, "body": { "error": "todoId is required" } }));
}
logger.info("Deleting todo", Some(json!({ "todoId": todo_id })));
iii.trigger(TriggerRequest {
function_id: "stream::delete".into(),
payload: json!({
"stream_name": "todo",
"group_id": "inbox",
"item_id": todo_id,
}),
action: Some(TriggerAction::Void),
timeout_ms: None,
}).await?;
logger.info("Todo deleted", Some(json!({ "todoId": todo_id })));
Ok(json!({ "status_code": 200, "body": { "success": true } }))
},
);
iii.register_trigger(RegisterTriggerInput {
trigger_type: "http".into(),
function_id: "api.delete.todo".into(),
config: json!({ "api_path": "todo", "http_method": "DELETE" }),
})?;Key concepts
iii.createStream(name, impl)registers a custom stream with get / set / delete / list / listGroups / update methods. The engine routesstream::*requests to this implementation.stream::setreturns the stored value — in the custom implementation above, thesethandler shapes the result.stream::getreturnsnullif the item doesn't exist — always guard before writing an update.- In Rust,
Streams::new(iii.clone())wraps the engine connection and providesstreams.update(key, ops)for atomic modifications.