iii

Triggers

Triggers are entrypoints into a iii system. Learn about trigger types, configuration, conditions, and lifecycle.

Triggers are entrypoints into a iii system. Each trigger defines the conditions that cause it to fire, a payload it accepts, and a function that it will invoke. When a trigger fires, the function is invoked with that payload.

Trigger Components

A trigger consists of three parts:

  1. Trigger Type: The mechanism that initiates execution (http, queue, cron, log, stream)
  2. Configuration: Type-specific settings (path, schedule, topics)
  3. Function ID: The function to invoke when triggered
iii.registerTrigger({
  type: 'http',
  function_id: fn.id,
  config: {
    api_path: '/users',
    http_method: 'POST',
  },
})
iii.register_trigger({
    'type': 'http',
    'function_id': fn.id,
    'config': {'api_path': '/users', 'http_method': 'POST'},
})
use iii_sdk::RegisterTriggerInput;
use serde_json::json;

iii.register_trigger(RegisterTriggerInput { type_: "http".into(), function_id: "users.create".into(), config: json!({
    "api_path": "/users",
    "http_method": "POST"
}), ..Default::default() })?;

Trigger Pipeline

Core Trigger Types

HTTP Trigger (http)

Executes functions in response to HTTP requests.

Provided by: HTTP Module

Configuration:

{
  type: 'http',
  function_id: fn.id,
  config: {
    api_path: '/users/:id',
    http_method: 'GET'
  }
}
{
    'type': 'http',
    'function_id': fn.id,
    'config': {
        'api_path': '/users/:id',
        'http_method': 'GET',
    },
}
json!({
    "type": "http",
    "function_id": "users.get",
    "config": {
        "api_path": "/users/:id",
        "http_method": "GET"
    }
})

Input: ApiRequest with path params, query params, body, headers

Output: ApiResponse with status_code, body, headers

Conditions: Optional. Add condition_function_id to config with a function ID. The engine invokes it before the handler; if it returns false, the handler function is not called. See Trigger Conditions.

Path Parameters: Extract values from URL

// Trigger: api_path: '/users/:userId/posts/:postId'
// Request: GET /users/123/posts/456
// Handler receives: { path_params: { userId: '123', postId: '456' } }

HTTP Module

Learn more about the API trigger

The http Utility Function

The simple ApiResponse return pattern works well for JSON endpoints, but some use cases require full control over the response lifecycle — streaming a file download, sending Server-Sent Events, or setting headers incrementally. The http() utility function enables this.

When you wrap your handler with an HTTP wrapper, it receives two arguments instead of one: the original request and a response object that gives you imperative control over the outgoing response:

import { http } from 'iii-sdk'
import type { HttpRequest, HttpResponse } from 'iii-sdk'

const fn = iii.registerFunction(
  { id: 'api.example' },
  http(async (req: HttpRequest, response: HttpResponse) => {
    response.status(200)
    response.headers({ 'content-type': 'application/json' })
    response.stream.end(Buffer.from(JSON.stringify({ ok: true })))
  }),
)
from iii import register_worker, http
from iii.types import HttpRequest, HttpResponse

@http
async def handler(req: HttpRequest, response: HttpResponse):
    await response.status(200)
    await response.headers({"content-type": "application/json"})
    await response.writer.write(json.dumps({"ok": True}).encode("utf-8"))
    await response.writer.close_async()

fn_ref = iii_client.register_function({"id": "api.example"}, handler)

In Rust, the HTTP handler receives the raw Value input. You extract channel refs to obtain a ChannelWriter for the response and use send_message to set status and headers:

use iii_sdk::{III, IIIError, ChannelWriter, extract_channel_refs, ChannelDirection, RegisterFunctionMessage};
use serde_json::{json, Value};

iii.register_function(RegisterFunctionMessage { id: "api.example".into(), ..Default::default() }, move |input: Value| {
    let iii = iii.clone();
    async move {
        let refs = extract_channel_refs(&input);
        let writer_ref = refs.iter()
            .find(|(_, r)| matches!(r.direction, ChannelDirection::Write))
            .map(|(_, r)| r.clone())
            .expect("missing writer ref");

        let writer = ChannelWriter::new(iii.address(), &writer_ref);

        writer.send_message(&serde_json::to_string(
            &json!({"type": "set_status", "status_code": 200})
        ).unwrap()).await.map_err(|e| IIIError::Handler(e.to_string()))?;

        writer.send_message(&serde_json::to_string(
            &json!({"type": "set_headers", "headers": {"content-type": "application/json"}})
        ).unwrap()).await.map_err(|e| IIIError::Handler(e.to_string()))?;

        writer.write(b"{\"ok\":true}").await.map_err(|e| IIIError::Handler(e.to_string()))?;
        writer.close().await.map_err(|e| IIIError::Handler(e.to_string()))?;

        Ok(Value::Null)
    }
});

The response object provides these capabilities across all SDKs:

CapabilityNode / TypeScriptPythonRust
Set status coderesponse.status(code)await response.status(code)writer.send_message(set_status json)
Set headersresponse.headers(map)await response.headers(map)writer.send_message(set_headers json)
Write body dataresponse.stream.write(data)response.stream.write(data) (sync) or await response.writer.write(data) (async)writer.write(&data).await
Access writable streamresponse.streamresponse.stream (WritableStream)N/A (use writer directly)
Access channel writerN/Aresponse.writer (ChannelWriter)writer
Close responseresponse.stream.end() / response.close()response.close() (sync) or await response.writer.close_async() (async)writer.close().await

In Python, the http wrapper can be used as a decorator (@http) on an async def handler. The wrapped handler must be an async function that accepts (req: HttpRequest, response: HttpResponse) and optionally returns an ApiResponse. Import it with from iii import http.

Under the hood, the wrapper unwraps the internal request, constructs the response object that maps status() and headers() calls to control messages sent over the underlying channel, and exposes the channel's writable stream directly. This means you can pipe file streams, write SSE frames, or send any binary data through the response stream.

Use the HTTP wrapper when you need streaming responses. For simple JSON endpoints, returning a response dict/object directly is simpler and sufficient.

File Download

Use the HTTP wrapper to stream a file to the client without buffering the entire file in memory:

import * as fs from 'node:fs'
import { pipeline } from 'node:stream/promises'
import { http } from 'iii-sdk'
import type { HttpRequest, HttpResponse } from 'iii-sdk'

const fn = iii.registerFunction(
  { id: 'api.download.pdf' },
  http(async (_req: HttpRequest, response: HttpResponse) => {
    const fileStream = fs.createReadStream('/path/to/report.pdf')

    response.status(200)
    response.headers({ 'content-type': 'application/pdf' })

    await pipeline(fileStream, response.stream)
  }),
)

iii.registerTrigger({
  type: 'http',
  function_id: fn.id,
  config: {
    api_path: '/download/report',
    http_method: 'GET',
  },
})

pipeline() handles backpressure and error propagation between the file read stream and the response write stream automatically.

from pathlib import Path
from iii import http
from iii.types import HttpRequest, HttpResponse

@http
async def handler(req: HttpRequest, response: HttpResponse):
    pdf_data = Path("/path/to/report.pdf").read_bytes()

    await response.status(200)
    await response.headers({"content-type": "application/pdf"})
    await response.writer.write(pdf_data)
    await response.writer.close_async()

fn_ref = iii_client.register_function({"id": "api.download.pdf"}, handler)
iii_client.register_trigger({
    "type": "http",
    "function_id": "api.download.pdf",
    "config": {
        "api_path": "download/report",
        "http_method": "GET",
    },
})
use iii_sdk::{III, IIIError, ChannelWriter, extract_channel_refs, ChannelDirection, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::{json, Value};

let iii_for_handler = iii.clone();
iii.register_function(RegisterFunctionMessage { id: "api.download.pdf".into(), ..Default::default() }, move |input: Value| {
    let iii = iii_for_handler.clone();
    async move {
        let pdf_data = std::fs::read("/path/to/report.pdf")
            .map_err(|e| IIIError::Handler(e.to_string()))?;

        let refs = extract_channel_refs(&input);
        let writer_ref = refs.iter()
            .find(|(_, r)| matches!(r.direction, ChannelDirection::Write))
            .map(|(_, r)| r.clone())
            .expect("missing writer ref");

        let writer = ChannelWriter::new(iii.address(), &writer_ref);

        writer.send_message(&serde_json::to_string(
            &json!({"type": "set_status", "status_code": 200})
        ).unwrap()).await.map_err(|e| IIIError::Handler(e.to_string()))?;

        writer.send_message(&serde_json::to_string(
            &json!({"type": "set_headers", "headers": {"content-type": "application/pdf"}})
        ).unwrap()).await.map_err(|e| IIIError::Handler(e.to_string()))?;

        writer.write(&pdf_data).await.map_err(|e| IIIError::Handler(e.to_string()))?;
        writer.close().await.map_err(|e| IIIError::Handler(e.to_string()))?;

        Ok(Value::Null)
    }
});

iii.register_trigger(RegisterTriggerInput { type_: "http".into(), function_id: "api.download.pdf".into(), config: json!({
    "api_path": "download/report",
    "http_method": "GET",
}), ..Default::default() })?;

Server-Sent Events (SSE)

SSE lets you push events from the server to the client over a single HTTP connection. Set the appropriate headers and write SSE frames to the response stream:

import { http } from 'iii-sdk'
import type { HttpRequest, HttpResponse } from 'iii-sdk'

const fn = iii.registerFunction(
  { id: 'api.events' },
  http(async (_req: HttpRequest, response: HttpResponse) => {
    response.status(200)
    response.headers({
      'content-type': 'text/event-stream',
      'cache-control': 'no-cache',
      connection: 'keep-alive',
    })

    const events = [
      { id: '1', type: 'message', data: 'Hello, world!' },
      { id: '2', type: 'update', data: JSON.stringify({ count: 42 }) },
      { id: '3', type: 'done', data: 'goodbye' },
    ]

    for (const event of events) {
      let frame = ''
      frame += `id: ${event.id}\n`
      frame += `event: ${event.type}\n`
      for (const line of event.data.split('\n')) {
        frame += `data: ${line}\n`
      }
      frame += '\n'

      response.stream.write(Buffer.from(frame))
    }

    response.stream.end()
  }),
)

iii.registerTrigger({
  type: 'http',
  function_id: fn.id,
  config: {
    api_path: '/events',
    http_method: 'GET',
  },
})
import time
import json
from iii import http
from iii.types import HttpRequest, HttpResponse

@http
async def handler(req: HttpRequest, response: HttpResponse):
    await response.status(200)
    await response.headers({
        "content-type": "text/event-stream",
        "cache-control": "no-cache",
        "connection": "keep-alive",
    })

    events = [
        {"id": "1", "type": "message", "data": "Hello, world!"},
        {"id": "2", "type": "update", "data": json.dumps({"count": 42})},
        {"id": "3", "type": "done", "data": "goodbye"},
    ]

    for event in events:
        frame = ""
        frame += f"id: {event['id']}\n"
        frame += f"event: {event['type']}\n"
        for line in event["data"].split("\n"):
            frame += f"data: {line}\n"
        frame += "\n"

        await response.writer.write(frame.encode("utf-8"))
        time.sleep(0.05)

    await response.writer.close_async()

fn_ref = iii_client.register_function({"id": "api.events"}, handler)
iii_client.register_trigger({
    "type": "http",
    "function_id": "api.events",
    "config": {
        "api_path": "events",
        "http_method": "GET",
    },
})
use std::time::Duration;
use iii_sdk::{III, IIIError, ChannelWriter, extract_channel_refs, ChannelDirection, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::{json, Value};

let events = vec![
    json!({"id": "1", "type": "message", "data": "Hello, world!"}),
    json!({"id": "2", "type": "update", "data": serde_json::to_string(&json!({"count": 42})).unwrap()}),
    json!({"id": "3", "type": "done", "data": "goodbye"}),
];

let iii_for_handler = iii.clone();
iii.register_function(RegisterFunctionMessage { id: "api.events".into(), ..Default::default() }, move |input: Value| {
    let iii = iii_for_handler.clone();
    let events = events.clone();
    async move {
        let refs = extract_channel_refs(&input);
        let writer_ref = refs.iter()
            .find(|(_, r)| matches!(r.direction, ChannelDirection::Write))
            .map(|(_, r)| r.clone())
            .expect("missing writer ref");

        let writer = ChannelWriter::new(iii.address(), &writer_ref);

        writer.send_message(&serde_json::to_string(
            &json!({"type": "set_status", "status_code": 200})
        ).unwrap()).await.map_err(|e| IIIError::Handler(e.to_string()))?;

        writer.send_message(&serde_json::to_string(&json!({
            "type": "set_headers", "headers": {
                "content-type": "text/event-stream",
                "cache-control": "no-cache",
                "connection": "keep-alive",
            }
        })).unwrap()).await.map_err(|e| IIIError::Handler(e.to_string()))?;

        for event in &events {
            let mut frame = String::new();
            frame.push_str(&format!("id: {}\n", event["id"].as_str().unwrap()));
            frame.push_str(&format!("event: {}\n", event["type"].as_str().unwrap()));
            for line in event["data"].as_str().unwrap().split('\n') {
                frame.push_str(&format!("data: {line}\n"));
            }
            frame.push('\n');

            writer.write(frame.as_bytes()).await
                .map_err(|e| IIIError::Handler(e.to_string()))?;
            tokio::time::sleep(Duration::from_millis(50)).await;
        }

        writer.close().await.map_err(|e| IIIError::Handler(e.to_string()))?;
        Ok(Value::Null)
    }
});

iii.register_trigger(RegisterTriggerInput { type_: "http".into(), function_id: "api.events".into(), config: json!({
    "api_path": "events",
    "http_method": "GET",
}), ..Default::default() })?;

Each SSE frame follows the standard format: id, event, and data fields separated by newlines, with a blank line marking the end of a frame. Multi-line data values are split across multiple data: lines.


Queue Trigger (queue)

Executes functions when events are published to subscribed topics.

Provided by: Queue Module

Configuration:

{
  type: 'queue',
  function_id: fn.id,
  config: {
    topic: 'user.created'
  }
}
{
    'type': 'queue',
    'function_id': fn.id,
    'config': {
        'topic': 'user.created',
    },
}
json!({
    "type": "queue",
    "function_id": "users.on_created",
    "config": {
        "topic": "user.created"
    }
})

Input: Event payload (any JSON data)

Output: Function result (optional, fire-and-forget pattern supported)

Conditions: Optional. Add condition_function_id to config. See Trigger Conditions.

Multiple Topics: Register separate triggers for each topic.

Queue Module

Learn more about the Queue trigger


Cron Trigger (cron)

Executes functions on a time-based schedule using cron expressions.

Provided by: Cron Module

Configuration:

{
  type: 'cron',
  function_id: fn.id,
  config: {
    expression: '0 2 * * *'
  }
}
{
    'type': 'cron',
    'function_id': fn.id,
    'config': {
        'expression': '0 2 * * *',
    },
}
json!({
    "type": "cron",
    "function_id": "reports.daily",
    "config": {
        "expression": "0 2 * * *"
    }
})

Input: Cron execution context (timestamp, trigger info)

Output: Function result

Conditions: Optional. Add condition_function_id to config. See Trigger Conditions.

Cron Expression: Standard 5-field format (minute hour day month weekday)

Cron Module

Learn more about the Cron trigger


Log Trigger (log)

Executes functions when log entries match specified criteria.

Provided by: Observability Module

Configuration:

{
  type: 'log',
  function_id: fn.id,
  config: {
    level: 'error'
  }
}
{
    'type': 'log',
    'function_id': fn.id,
    'config': {
        'level': 'error',
    },
}
json!({
    "type": "log",
    "function_id": "alerts.on_error",
    "config": {
        "level": "error"
    }
})

Input: Log entry with trace_id, message, level, function_name, date

Output: Function result (useful for alerting, metrics)

Log Levels: info, warn, error, debug (omit to receive all levels)

Observability Module

Learn more about the Log trigger


Stream Triggers (stream:join, stream:leave)

Executes functions when clients connect to or disconnect from streams.

Provided by: Stream Module

Configuration:

{
  type: 'stream:join',
  function_id: fn.id,
  config: {}
}

{
  type: 'stream:leave',
  function_id: fn.id,
  config: {}
}

Input: Subscription info with stream_name, group_id, item_id, context

Output: Function result (useful for access control, analytics)

Conditions: Optional. Add condition_function_id to config for stream:join/stream:leave. See Trigger Conditions.

Stream Module

Learn more about Stream triggers

Trigger Type Comparison

Trigger TypeUse CaseSynchronousMultiple Subscribers
httpHTTP endpoints✓ Yes✗ No (1:1 mapping)
queuePub/sub messaging✗ No✓ Yes
cronScheduled tasks✗ No✗ No (distributed lock)
logLog monitoring✗ No✓ Yes
stream:joinStream connections✗ No✓ Yes
stream:leaveStream disconnections✗ No✓ Yes

Registering Triggers

Triggers are registered by workers after establishing a connection to the engine:

Step 1: Register Function

First, register the function that will be invoked:

const fn = iii.registerFunction({ id: 'users.create' }, async (data) => {
  return { id: '123', ...data }
})
def create_user(data):
    return {'id': '123', **data}

fn = iii.register_function({"id": "users.create"}, create_user)
use iii_sdk::RegisterFunctionMessage;
use serde_json::{json, Value};

iii.register_function(RegisterFunctionMessage { id: "users.create".into(), ..Default::default() }, |data: Value| async move {
    Ok(json!({"id": "123", "name": data["name"]}))
});

Step 2: Register Trigger

Then, register a trigger that routes to that function:

iii.registerTrigger({
  type: 'http',
  function_id: fn.id,
  config: {
    api_path: '/users',
    http_method: 'POST',
  },
})
iii.register_trigger({
    'type': 'http',
    'function_id': fn.id,
    'config': {'api_path': '/users', 'http_method': 'POST'},
})
use iii_sdk::RegisterTriggerInput;
use serde_json::json;

iii.register_trigger(RegisterTriggerInput { type_: "http".into(), function_id: "users.create".into(), config: json!({
    "api_path": "/users",
    "http_method": "POST"
}), ..Default::default() })?;

Step 3: Trigger Active

The engine sets up the trigger in the appropriate module. When an HTTP POST request comes to /users, the users.create function will be invoked.

Trigger types are registered by core modules during engine initialization:

Multiple Triggers to One Function

A single function can have multiple triggers:

const fn = iii.registerFunction({ id: 'users.notify' }, async (data) => {
  await sendNotification(data)
})

iii.registerTrigger({
  type: 'http',
  function_id: fn.id,
  config: { api_path: '/notify', http_method: 'POST' },
})

iii.registerTrigger({
  type: 'queue',
  function_id: fn.id,
  config: { topic: 'user.created' },
})

iii.registerTrigger({
  type: 'cron',
  function_id: fn.id,
  config: { expression: '0 9 * * *' },
})
def notify(data):
    send_notification(data)

fn = iii.register_function({"id": "users.notify"}, notify)

iii.register_trigger({
    'type': 'http',
    'function_id': fn.id,
    'config': {'api_path': '/notify', 'http_method': 'POST'},
})

iii.register_trigger({
    'type': 'queue',
    'function_id': fn.id,
    'config': {'topic': 'user.created'},
})

iii.register_trigger({
    'type': 'cron',
    'function_id': fn.id,
    'config': {'expression': '0 9 * * *'},
})
use iii_sdk::{RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;

iii.register_function(RegisterFunctionMessage { id: "users.notify".into(), ..Default::default() }, |data: Value| async move {
    send_notification(&data).await;
    Ok(json!({}))
});

iii.register_trigger(RegisterTriggerInput { type_: "http".into(), function_id: "users.notify".into(), config: json!({
    "api_path": "/notify",
    "http_method": "POST"
}), ..Default::default() })?;

iii.register_trigger(RegisterTriggerInput { type_: "queue".into(), function_id: "users.notify".into(), config: json!({
    "topic": "user.created"
}), ..Default::default() })?;

iii.register_trigger(RegisterTriggerInput { type_: "cron".into(), function_id: "users.notify".into(), config: json!({
    "expression": "0 9 * * *"
}), ..Default::default() })?;

Use Case: Send notifications via API calls, events, or scheduled jobs using the same logic.

Trigger Conditions

Triggers can optionally use a condition function to decide whether the handler should run. The engine invokes the condition with the trigger payload before the main handler. If it returns false, the handler is not invoked.

Supported trigger types: http, queue, cron, stream, state

How It Works

  1. Register a condition function that receives the same input as the handler and returns a boolean.
  2. Add condition_function_id to the trigger config with the condition function's ID.
  3. When the trigger fires, the engine calls the condition first. Only if it returns truthy does the handler run.

Example: HTTP Trigger with Condition

const conditionFn = iii.registerFunction(
  { id: 'conditions::requireVerified' },
  async (req) => req.headers?.['x-verified'] === 'true',
)

const fn = iii.registerFunction({ id: 'api::verifiedOnly' }, async (req) => ({
  status_code: 200,
  body: { message: 'Verified request' },
}))

iii.registerTrigger({
  type: 'http',
  function_id: fn.id,
  config: {
    api_path: '/verified',
    http_method: 'POST',
    condition_function_id: conditionFn.id,
  },
})
def require_verified(req):
    return req.get('headers', {}).get('x-verified') == 'true'

condition_fn = iii.register_function({"id": "conditions::requireVerified"}, require_verified)

def verified_only(req):
    return {'status_code': 200, 'body': {'message': 'Verified request'}}

fn = iii.register_function({"id": "api::verifiedOnly"}, verified_only)

iii.register_trigger({
    'type': 'http',
    'function_id': fn.id,
    'config': {
        'api_path': '/verified',
        'http_method': 'POST',
        'condition_function_id': condition_fn.id,
    },
})
use iii_sdk::{RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;

iii.register_function(RegisterFunctionMessage { id: "conditions::requireVerified".into(), ..Default::default() }, |req: Value| async move {
    let verified = req["headers"]["x-verified"].as_str() == Some("true");
    Ok(json!(verified))
});

iii.register_function(RegisterFunctionMessage { id: "api::verifiedOnly".into(), ..Default::default() }, |_req: Value| async move {
    Ok(json!({"status_code": 200, "body": {"message": "Verified request"}}))
});

iii.register_trigger(RegisterTriggerInput { type_: "http".into(), function_id: "api::verifiedOnly".into(), config: json!({
    "api_path": "/verified",
    "http_method": "POST",
    "condition_function_id": "conditions::requireVerified"
}), ..Default::default() })?;

Example: Queue Trigger with Condition

const conditionFn = iii.registerFunction(
  { id: 'conditions::highValue' },
  async (data) => (data?.amount ?? 0) > 1000,
)

const fn = iii.registerFunction({ id: 'orders::processHighValue' }, async (order) => {
  await processHighValueOrder(order)
  return {}
})

iii.registerTrigger({
  type: 'queue',
  function_id: fn.id,
  config: {
    topic: 'order.placed',
    condition_function_id: conditionFn.id,
  },
})
def high_value(data):
    return (data.get('amount', 0)) > 1000

condition_fn = iii.register_function({"id": "conditions::highValue"}, high_value)

def process_high_value(order):
    process_high_value_order(order)
    return {}

fn = iii.register_function({"id": "orders::processHighValue"}, process_high_value)

iii.register_trigger({
    'type': 'queue',
    'function_id': fn.id,
    'config': {
        'topic': 'order.placed',
        'condition_function_id': condition_fn.id,
    },
})
use iii_sdk::{RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;

iii.register_function(RegisterFunctionMessage { id: "conditions::highValue".into(), ..Default::default() }, |data: Value| async move {
    let amount = data["amount"].as_f64().unwrap_or(0.0);
    Ok(json!(amount > 1000.0))
});

iii.register_function(RegisterFunctionMessage { id: "orders::processHighValue".into(), ..Default::default() }, |order: Value| async move {
    process_high_value_order(&order).await;
    Ok(json!({}))
});

iii.register_trigger(RegisterTriggerInput { type_: "queue".into(), function_id: "orders::processHighValue".into(), config: json!({
    "topic": "order.placed",
    "condition_function_id": "conditions::highValue"
}), ..Default::default() })?;

Example: State Trigger with Condition

const conditionFn = iii.registerFunction(
  { id: 'conditions::profileChanged' },
  async (event) => event.event_type === 'updated' && event.key === 'profile',
)

const fn = iii.registerFunction({ id: 'state::onProfileUpdate' }, async (event) => {
  console.log('Profile updated:', event.new_value)
  return {}
})

iii.registerTrigger({
  type: 'state',
  function_id: fn.id,
  config: {
    scope: 'users',
    condition_function_id: conditionFn.id,
  },
})
def profile_changed(event):
    return event.get('event_type') == 'updated' and event.get('key') == 'profile'

condition_fn = iii.register_function({"id": "conditions::profileChanged"}, profile_changed)

def on_profile_update(event):
    print('Profile updated:', event.get('new_value'))
    return {}

fn = iii.register_function({"id": "state::onProfileUpdate"}, on_profile_update)

iii.register_trigger({
    'type': 'state',
    'function_id': fn.id,
    'config': {
        'scope': 'users',
        'condition_function_id': condition_fn.id,
    },
})
use iii_sdk::{RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;

iii.register_function(RegisterFunctionMessage { id: "conditions::profileChanged".into(), ..Default::default() }, |event: Value| async move {
    let is_updated = event["event_type"].as_str() == Some("updated");
    let is_profile = event["key"].as_str() == Some("profile");
    Ok(json!(is_updated && is_profile))
});

iii.register_function(RegisterFunctionMessage { id: "state::onProfileUpdate".into(), ..Default::default() }, |event: Value| async move {
    println!("Profile updated: {:?}", event["new_value"]);
    Ok(json!({}))
});

iii.register_trigger(RegisterTriggerInput { type_: "state".into(), function_id: "state::onProfileUpdate".into(), config: json!({
    "scope": "users",
    "condition_function_id": "conditions::profileChanged"
}), ..Default::default() })?;

Config Key

All trigger types use condition_function_id in the trigger config to reference the condition function.

Trigger Lifecycle

Unregistering Triggers

Remove triggers dynamically. registerTrigger returns a trigger object with unregister():

const fn = iii.registerFunction({ id: 'api.temp' }, async () => ({ status_code: 200, body: {} }))
const trigger = iii.registerTrigger({
  type: 'http',
  function_id: fn.id,
  config: { api_path: '/temp', http_method: 'GET' },
})

trigger.unregister()

Usage Patterns

Webhook Handler

const fn = iii.registerFunction({ id: 'webhooks.github' }, async (req) => {
  const event = req.headers?.['x-github-event']
  const payload = req.body

  await processGitHubWebhook(event, payload)

  return { status_code: 200, body: { received: true } }
})

iii.registerTrigger({
  type: 'http',
  function_id: fn.id,
  config: {
    api_path: '/webhooks/github',
    http_method: 'POST',
  },
})
def github_webhook(req):
    event = req.get('headers', {}).get('x-github-event')
    payload = req.get('body')

    process_github_webhook(event, payload)

    return {'status_code': 200, 'body': {'received': True}}

fn = iii.register_function({"id": "webhooks.github"}, github_webhook)

iii.register_trigger({
    'type': 'http',
    'function_id': fn.id,
    'config': {
        'api_path': '/webhooks/github',
        'http_method': 'POST',
    },
})
use iii_sdk::{RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;

iii.register_function(RegisterFunctionMessage { id: "webhooks.github".into(), ..Default::default() }, |req: Value| async move {
    let event = req["headers"]["x-github-event"].as_str().unwrap_or("");
    let payload = &req["body"];

    process_github_webhook(event, payload).await;

    Ok(json!({"status_code": 200, "body": {"received": true}}))
});

iii.register_trigger(RegisterTriggerInput { type_: "http".into(), function_id: "webhooks.github".into(), config: json!({
    "api_path": "/webhooks/github",
    "http_method": "POST"
}), ..Default::default() })?;

Event Chain

import { TriggerAction } from 'iii-sdk'

const processFn = iii.registerFunction({ id: 'orders.process' }, async (order) => {
  await saveOrder(order)
  iii.trigger({
    function_id: 'enqueue',
    payload: { topic: 'order.processed', data: order },
    action: TriggerAction.Void(),
  })
})

iii.registerTrigger({
  type: 'queue',
  function_id: processFn.id,
  config: { topic: 'order.placed' },
})

const confirmFn = iii.registerFunction({ id: 'orders.sendConfirmation' }, async (order) => {
  await sendEmail(order.email, 'Order confirmed')
})

iii.registerTrigger({
  type: 'queue',
  function_id: confirmFn.id,
  config: { topic: 'order.processed' },
})
from iii import TriggerAction

def process_order(order):
    save_order(order)
    iii.trigger({
        'function_id': 'enqueue',
        'payload': {
            'topic': 'order.processed',
            'data': order,
        },
        'action': TriggerAction.Void(),
    })

process_fn = iii.register_function({"id": "orders.process"}, process_order)

iii.register_trigger({
    'type': 'queue',
    'function_id': process_fn.id,
    'config': {'topic': 'order.placed'},
})

def send_confirmation(order):
    send_email(order['email'], 'Order confirmed')

confirm_fn = iii.register_function({"id": "orders.sendConfirmation"}, send_confirmation)

iii.register_trigger({
    'type': 'queue',
    'function_id': confirm_fn.id,
    'config': {'topic': 'order.processed'},
})
use iii_sdk::{TriggerAction, TriggerRequest, RegisterFunctionMessage, RegisterTriggerInput};

let iii_clone = iii.clone();
iii.register_function(RegisterFunctionMessage { id: "orders.process".into(), ..Default::default() }, move |order: Value| {
    let iii = iii_clone.clone();
    async move {
        save_order(&order).await;
        iii.trigger(
            TriggerRequest::new("enqueue", json!({
                "topic": "order.processed",
                "data": order,
            }))
            .action(TriggerAction::void()),
        )
        .await?;
        Ok(json!(null))
    }
});

iii.register_trigger(RegisterTriggerInput { type_: "queue".into(), function_id: "orders.process".into(), config: json!({
    "topic": "order.placed"
}), ..Default::default() })?;

iii.register_function(RegisterFunctionMessage { id: "orders.sendConfirmation".into(), ..Default::default() }, |order: Value| async move {
    send_email(order["email"].as_str().unwrap_or(""), "Order confirmed").await;
    Ok(json!(null))
});

iii.register_trigger(RegisterTriggerInput { type_: "queue".into(), function_id: "orders.sendConfirmation".into(), config: json!({
    "topic": "order.processed"
}), ..Default::default() })?;

Scheduled Cleanup

const fn = iii.registerFunction({ id: 'maintenance.cleanup' }, async () => {
  const deleted = await deleteOldRecords()
  logger.info(`Deleted ${deleted} old records`)
})

iii.registerTrigger({
  type: 'cron',
  function_id: fn.id,
  config: {
    expression: '0 3 * * *',
  },
})
def cleanup():
    deleted = delete_old_records()
    logger.info(f'Deleted {deleted} old records')

fn = iii.register_function({"id": "maintenance.cleanup"}, cleanup)

iii.register_trigger({
    'type': 'cron',
    'function_id': fn.id,
    'config': {
        'expression': '0 3 * * *',
    },
})
use iii_sdk::{RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;

iii.register_function(RegisterFunctionMessage { id: "maintenance.cleanup".into(), ..Default::default() }, |_: Value| async move {
    let deleted = delete_old_records().await;
    info!("Deleted {} old records", deleted);
    Ok(json!(null))
});

iii.register_trigger(RegisterTriggerInput { type_: "cron".into(), function_id: "maintenance.cleanup".into(), config: json!({
    "expression": "0 3 * * *"
}), ..Default::default() })?;

Best Practices

Custom Trigger Types

Modules can register custom trigger types by implementing the trigger interface:

pub struct CustomModule {
    // Module state
}

impl CoreModule for CustomModule {
    fn name(&self) -> &str {
        "custom"
    }

    async fn register_trigger_type(&self, registry: &TriggerRegistry) {
        registry.register("custom:trigger", CustomTriggerHandler);
    }
}

Next Steps

Modules

Explore modules that provide trigger types

On this page