iii

Todo App

Full CRUD backed by a custom stream implementation, showing iii.createStream and stream::get/set/delete.

This example builds a complete Todo API using the iii SDK directly. It covers custom stream registration (iii.createStream), reading and writing via stream::get / stream::set / stream::delete, and the helper stream::list for group-level queries.

Custom stream

The iii SDK lets you register a custom stream backed by any storage — in-memory, a database, or a remote service. The engine routes stream::get / stream::set / stream::delete / stream::list requests to your implementation.

// stream.ts
import { registerWorker } from 'iii-sdk'

export type Todo = {
  id: string
  description: string
  groupId: string
  createdAt: string
  dueDate?: string
  completedAt: string | null
}

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

let todos: Todo[] = []

iii.createStream('todo', {
  get: async (input) => todos.find((t) => t.id === input.item_id) ?? null,

  set: async (input) => {
    const existing = todos.find((t) => t.id === input.item_id)
    if (existing) {
      const updated = { ...existing, ...input.data }
      todos = todos.map((t) => (t.id === input.item_id ? updated : t))
      return { old_value: existing, new_value: updated }
    }
    const newTodo: Todo = {
      id: input.item_id,
      groupId: input.group_id,
      description: input.data.description,
      createdAt: new Date().toISOString(),
      dueDate: input.data.dueDate,
      completedAt: null,
    }
    todos.push(newTodo)
    return { old_value: undefined, new_value: newTodo }
  },

  delete: async (input) => {
    const old_value = todos.find((t) => t.id === input.item_id)
    todos = todos.filter((t) => t.id !== input.item_id)
    return { old_value }
  },

  list: async (input) => todos.filter((t) => t.groupId === input.group_id),
  listGroups: async () => [...new Set(todos.map((t) => t.groupId))],
  update: async () => { throw new Error('Not implemented') },
})

export { iii }
# stream.py
from __future__ import annotations
from typing import Any
from iii import IStream
from iii.stream import (
    StreamDeleteInput, StreamDeleteResult, StreamGetInput, StreamListGroupsInput,
    StreamListInput, StreamSetInput, StreamSetResult, StreamUpdateInput,
    StreamUpdateResult,
)
from .iii import iii

class TodoStream(IStream[dict[str, Any]]):
    def __init__(self) -> None:
        self._todos: list[dict[str, Any]] = []

    async def get(self, input: StreamGetInput) -> dict[str, Any] | None:
        return next((t for t in self._todos if t["id"] == input.item_id), None)

    async def set(self, input: StreamSetInput) -> StreamSetResult[dict[str, Any]] | None:
        for i, todo in enumerate(self._todos):
            if todo["id"] == input.item_id:
                updated = {**todo, **input.data}
                self._todos[i] = updated
                return StreamSetResult(old_value=todo, new_value=updated)

        new_todo = {
            "id": input.item_id,
            "groupId": input.group_id,
            "description": input.data.get("description", ""),
            "createdAt": input.data.get("createdAt"),
            "dueDate": input.data.get("dueDate"),
            "completedAt": None,
        }
        self._todos.append(new_todo)
        return StreamSetResult(old_value=None, new_value=new_todo)

    async def delete(self, input: StreamDeleteInput) -> StreamDeleteResult:
        old_value = next((t for t in self._todos if t["id"] == input.item_id), None)
        self._todos = [t for t in self._todos if t["id"] != input.item_id]
        return StreamDeleteResult(old_value=old_value)

    async def list(self, input: StreamListInput) -> list[dict[str, Any]]:
        return [t for t in self._todos if t.get("groupId") == input.group_id]

    async def list_groups(self, input: StreamListGroupsInput) -> list[str]:
        return list({t["groupId"] for t in self._todos if "groupId" in t})

    async def update(self, input: StreamUpdateInput) -> StreamUpdateResult[dict[str, Any]] | None:
        return None

iii.create_stream("todo", TodoStream())
// In Rust, use the Streams helper for atomic stream operations
use iii_sdk::{register_worker, InitOptions, Streams, UpdateOp};

let iii = register_worker("ws://localhost:49134", InitOptions::default());
let streams = Streams::new(iii.clone());
// Stream items are addressed as "stream_name::group_id::item_id"

Create

import {Logger, TriggerAction} from 'iii-sdk'
import { iii } from './stream'

iii.registerFunction(
  { id: 'api.post.todo', description: 'Create a new todo' },
  async (req: ApiRequest<{ description: string; dueDate?: string }>) => {
    const logger = new Logger()
    const { description, dueDate } = req.body ?? {}

    if (!description) {
      return { status_code: 400, body: { error: 'Description is required' } } satisfies ApiResponse
    }

    const todoId = `todo-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`

    logger.info('Creating todo', { todoId })

    const todo = await iii.trigger({
      function_id: 'stream::set',
      payload: {
        stream_name: 'todo',
        group_id: 'inbox',
        item_id: todoId,
        data: {
          id: todoId,
          description,
          groupId: 'inbox',
          createdAt: new Date().toISOString(),
          dueDate,
          completedAt: null,
        },
      },
    })

    logger.info('Todo created', { todoId })
    return { status_code: 201, body: todo } satisfies ApiResponse
  },
)

iii.registerTrigger({
  type: 'http',
  function_id: 'api.post.todo',
  config: { api_path: 'todo', http_method: 'POST', description: 'Create a new todo' },
})
import random, string, time
from datetime import datetime, timezone
from iii import ApiRequest, ApiResponse, Logger, TriggerAction
from .iii import iii

def _create_todo(data) -> ApiResponse:
    logger = Logger()
    req = ApiRequest(**data) if isinstance(data, dict) else data
    description = req.body.get("description") if req.body else None
    due_date = req.body.get("dueDate") if req.body else None

    if not description:
        return ApiResponse(status_code=400, body={"error": "Description is required"})

    suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=7))
    todo_id = f"todo-{int(time.time() * 1000)}-{suffix}"

    logger.info("Creating todo", {"todoId": todo_id})

    todo = iii.trigger({
        "function_id": "stream::set",
        "payload": {
            "stream_name": "todo",
            "group_id": "inbox",
            "item_id": todo_id,
            "data": {
                "id": todo_id,
                "description": description,
                "createdAt": datetime.now(timezone.utc).isoformat(),
                "dueDate": due_date,
                "completedAt": None,
            },
        },
    })

    logger.info("Todo created", {"todoId": todo_id})
    return ApiResponse(status_code=201, body=todo, headers={"Content-Type": "application/json"})

iii.register_function({"id": "api.post.todo"}, _create_todo)
iii.register_trigger({
    "type": "http", "function_id": "api.post.todo",
    "config": {"api_path": "todo", "http_method": "POST", "description": "Create a new todo"},
})
use iii_sdk::{
    Logger, RegisterFunctionMessage, RegisterTriggerInput,
    Streams, UpdateOp, ApiRequest,
};
use serde_json::json;

iii.register_function(
    RegisterFunctionMessage {
        id: "api.post.todo".into(),
        description: Some("Create a new todo".into()),
        request_format: None, response_format: None,
        metadata: None, invocation: None,
    },
    |input| async move {
        let logger = Logger::new();
        let req: ApiRequest = serde_json::from_value(input)?;
        let description = req.body["description"].as_str().unwrap_or("").to_string();

        if description.is_empty() {
            return Ok(json!({
                "status_code": 400,
                "body": { "error": "Description is required" },
            }));
        }

        let todo_id = format!("todo-{}", chrono::Utc::now().timestamp_millis());
        logger.info("Creating todo", Some(json!({ "todoId": todo_id })));

        let result = streams.update(
            &format!("todo::inbox::{}", todo_id),
            vec![
                UpdateOp::set("id", json!(todo_id.clone())),
                UpdateOp::set("description", json!(description)),
                UpdateOp::set("createdAt", json!(chrono::Utc::now().to_rfc3339())),
                UpdateOp::set("completedAt", json!(null)),
            ],
        ).await?;

        logger.info("Todo created", Some(json!({ "todoId": todo_id })));

        Ok(json!({ "status_code": 201, "body": result.new_value }))
    },
);

iii.register_trigger(RegisterTriggerInput {
    trigger_type: "http".into(),
    function_id: "api.post.todo".into(),
    config: json!({ "api_path": "todo", "http_method": "POST" }),
})?;

Update

iii.registerFunction(
  { id: 'api.put.todo', description: 'Update a todo' },
  async (req: ApiRequest) => {
    const logger = new Logger()
    const todoId = req.path_params?.id

    const existing = todoId
      ? await iii.trigger<Todo | null>({
          function_id: 'stream::get',
          payload: {
            stream_name: 'todo',
            group_id: 'inbox',
            item_id: todoId,
          },
        })
      : null

    if (!existing) {
      return { status_code: 404, body: { error: 'Todo not found' } } satisfies ApiResponse
    }

    logger.info('Updating todo', { todoId })

    const updated = await iii.trigger({
      function_id: 'stream::set',
      payload: {
        stream_name: 'todo',
        group_id: 'inbox',
        item_id: todoId,
        data: { ...existing, ...req.body },
      },
    })

    logger.info('Todo updated', { todoId })
    return { status_code: 200, body: updated } satisfies ApiResponse
  },
)

iii.registerTrigger({
  type: 'http',
  function_id: 'api.put.todo',
  config: { api_path: 'todo/:id', http_method: 'PUT', description: 'Update a todo' },
})
def _update_todo(data) -> ApiResponse:
    logger = Logger()
    req = ApiRequest(**data) if isinstance(data, dict) else data
    todo_id = req.path_params.get("id") if req.path_params else None

    existing = iii.trigger({
        "function_id": "stream::get",
        "payload": {"stream_name": "todo", "group_id": "inbox", "item_id": todo_id},
    }) if todo_id else None

    if not existing:
        return ApiResponse(status_code=404, body={"error": "Todo not found"})

    logger.info("Updating todo", {"todoId": todo_id})

    merged = {**existing, **(req.body or {})}
    updated = iii.trigger({
        "function_id": "stream::set",
        "payload": {"stream_name": "todo", "group_id": "inbox", "item_id": todo_id, "data": merged},
    })

    logger.info("Todo updated", {"todoId": todo_id})
    return ApiResponse(status_code=200, body=updated, headers={"Content-Type": "application/json"})

iii.register_function({"id": "api.put.todo"}, _update_todo)
iii.register_trigger({
    "type": "http", "function_id": "api.put.todo",
    "config": {"api_path": "todo/:id", "http_method": "PUT", "description": "Update a todo"},
})
iii.register_function(
    RegisterFunctionMessage {
        id: "api.put.todo".into(),
        description: Some("Update a todo".into()),
        request_format: None, response_format: None,
        metadata: None, invocation: None,
    },
    |input| async move {
        let logger = Logger::new();
        let req: ApiRequest = serde_json::from_value(input)?;
        let todo_id = req.path_params.get("id").cloned().unwrap_or_default();

        let mut ops = vec![];
        if let Some(desc) = req.body.get("description") {
            ops.push(UpdateOp::set("description", desc.clone()));
        }
        if let Some(checked) = req.body["checked"].as_bool() {
            let completed_at = if checked { json!(chrono::Utc::now().to_rfc3339()) } else { json!(null) };
            ops.push(UpdateOp::set("completedAt", completed_at));
        }

        if ops.is_empty() {
            return Ok(json!({ "status_code": 400, "body": { "error": "No fields to update" } }));
        }

        logger.info("Updating todo", Some(json!({ "todoId": todo_id })));
        let result = streams.update(&format!("todo::inbox::{}", todo_id), ops).await?;
        logger.info("Todo updated", Some(json!({ "todoId": todo_id })));

        Ok(json!({ "status_code": 200, "body": result.new_value }))
    },
);

iii.register_trigger(RegisterTriggerInput {
    trigger_type: "http".into(),
    function_id: "api.put.todo".into(),
    config: json!({ "api_path": "todo/:id", "http_method": "PUT" }),
})?;

Delete

iii.registerFunction(
  { id: 'api.delete.todo', description: 'Delete a todo' },
  async (req: ApiRequest<{ todoId: string }>) => {
    const logger = new Logger()
    const { todoId } = req.body ?? {}

    if (!todoId) {
      return { status_code: 400, body: { error: 'todoId is required' } } satisfies ApiResponse
    }

    logger.info('Deleting todo', { todoId })

    iii.trigger({
      function_id: 'stream::delete',
      payload: {
        stream_name: 'todo',
        group_id: 'inbox',
        item_id: todoId,
      },
      action: TriggerAction.Void(),
    })

    logger.info('Todo deleted', { todoId })
    return { status_code: 200, body: { success: true } } satisfies ApiResponse
  },
)

iii.registerTrigger({
  type: 'http',
  function_id: 'api.delete.todo',
  config: { api_path: 'todo', http_method: 'DELETE', description: 'Delete a todo' },
})
def _delete_todo(data) -> ApiResponse:
    logger = Logger()
    req = ApiRequest(**data) if isinstance(data, dict) else data
    todo_id = req.body.get("todoId") if req.body else None

    if not todo_id:
        return ApiResponse(status_code=400, body={"error": "todoId is required"})

    logger.info("Deleting todo", {"todoId": todo_id})

    iii.trigger({
        "function_id": "stream::delete",
        "payload": {"stream_name": "todo", "group_id": "inbox", "item_id": todo_id},
        "action": TriggerAction.Void(),
    })

    logger.info("Todo deleted", {"todoId": todo_id})
    return ApiResponse(status_code=200, body={"success": True}, headers={"Content-Type": "application/json"})

iii.register_function({"id": "api.delete.todo"}, _delete_todo)
iii.register_trigger({
    "type": "http", "function_id": "api.delete.todo",
    "config": {"api_path": "todo", "http_method": "DELETE", "description": "Delete a todo"},
})
use iii_sdk::{TriggerRequest, TriggerAction};

iii.register_function(
    RegisterFunctionMessage {
        id: "api.delete.todo".into(),
        description: Some("Delete a todo".into()),
        request_format: None, response_format: None,
        metadata: None, invocation: None,
    },
    |input| async move {
        let logger = Logger::new();
        let req: ApiRequest = serde_json::from_value(input)?;
        let todo_id = req.body["todoId"].as_str().unwrap_or("").to_string();

        if todo_id.is_empty() {
            return Ok(json!({ "status_code": 400, "body": { "error": "todoId is required" } }));
        }

        logger.info("Deleting todo", Some(json!({ "todoId": todo_id })));

        iii.trigger(TriggerRequest {
            function_id: "stream::delete".into(),
            payload: json!({
                "stream_name": "todo",
                "group_id": "inbox",
                "item_id": todo_id,
            }),
            action: Some(TriggerAction::Void),
            timeout_ms: None,
        }).await?;

        logger.info("Todo deleted", Some(json!({ "todoId": todo_id })));

        Ok(json!({ "status_code": 200, "body": { "success": true } }))
    },
);

iii.register_trigger(RegisterTriggerInput {
    trigger_type: "http".into(),
    function_id: "api.delete.todo".into(),
    config: json!({ "api_path": "todo", "http_method": "DELETE" }),
})?;

Key concepts

  • iii.createStream(name, impl) registers a custom stream with get / set / delete / list / listGroups / update methods. The engine routes stream::* requests to this implementation.
  • stream::set returns the stored value — in the custom implementation above, the set handler shapes the result.
  • stream::get returns null if the item doesn't exist — always guard before writing an update.
  • In Rust, Streams::new(iii.clone()) wraps the engine connection and provides streams.update(key, ops) for atomic modifications.

On this page