iii
Advanced

SDK Implementation

The iii SDK is available in both Python and Node.js, providing a bridge between your application and the iii Engine via WebSocket communication.

SDK Architecture

The SDKs follow a bridge pattern where the SDK acts as a client, maintaining a persistent WebSocket connection to the engine to exchange JSON messages for function registration, trigger setup, and remote procedure calls.

Core Bridge Architecture

The Bridge is the central component in both SDKs. It manages the WebSocket lifecycle, message routing, and the registry of local functions available to the engine.

Connection Lifecycle

The Bridge establishes a WebSocket connection to the engine (defaulting to ws://127.0.0.1:49134). Upon connection, it flushes any pending registration messages (triggers, functions, services) to ensure the engine is aware of the worker's capabilities.

Message Types and Protocol

The SDKs implement specific message types to communicate with the engine. These are defined as Pydantic models in Python and TypeScript interfaces in Node.js.

Message TypeDescriptionKey Fields
RegisterFunctionRegisters a callable function with the enginefunction_path, description
RegisterTriggerBinds a specific configuration to a trigger typetrigger_type, function_path, config
RegisterConditionRegisters a condition function for conditional trigger executionfunction_path, condition_function_path
InvokeFunctionRequests the engine to execute a functionfunction_path, data
InvocationResultReturns the result of a function executioninvocation_id, result, error

Multi-Trigger Architecture

Steps can have multiple triggers of different types (API, Event, Cron), all routing to the same handler function. The engine assigns each trigger a unique indexed function path.

Trigger Indexing

Each trigger in a step gets a unique function path using the format: steps.{StepName}:trigger:{index}

Index: Corresponds to the trigger's position in the triggers array (0-based).

TriggerInput Normalization

All triggers normalize their input to a unified TriggerInput structure before invoking the handler:

interface TriggerInput {
  trigger: TriggerMetadata
  request: ApiRequest | null // Only for API triggers
  data: any // Payload data
}

interface TriggerMetadata {
  type: 'api' | 'event' | 'cron'
  index: number
  // Type-specific fields
  path?: string // API only
  method?: string // API only
  topic?: string // Event only
  expression?: string // Cron only
}

Example Flow:

Benefits:

  • One handler serves multiple trigger types
  • Access trigger metadata via ctx.trigger or trigger parameter
  • Consistent input structure across all trigger types

Condition Evaluation

Conditions are functions that run before the handler to determine if execution should proceed. They receive the same TriggerInput as the handler.

Condition Flow

Condition Function Signature

Python:

def condition_name(input: Any, ctx: FlowContext) -> bool:
    # Evaluate based on input data
    return input.get("amount", 0) > 1000

# Or async
async def async_condition(input: Any, ctx: FlowContext) -> bool:
    user = await fetch_user(input.get("user_id"))
    return user.is_verified

TypeScript:

const conditionName = (input: any, ctx: FlowContext): boolean => {
  // Evaluate based on input data
  return input?.amount > 1000
}

// Or async
const asyncCondition = async (input: any, ctx: FlowContext): Promise<boolean> => {
  const user = await fetchUser(input?.userId)
  return user.isVerified
}

Condition Registration

Conditions are registered separately from the main handler:

  1. Handler Registration: steps.{StepName}:trigger:{index}
  2. Condition Registration: steps.{StepName}:trigger:{index}:condition

The engine evaluates the condition before invoking the handler. If the condition returns false, the handler is never called.

Short-Circuit Behavior: The first false condition stops execution immediately.

See Examples: For practical examples of multi-trigger steps and conditions in action, see Motia SDK - Multiple Triggers and Trigger Conditions.

Python Implementation

The Python implementation is divided into two layers: the low-level SDK (iii) and the high-level framework (motia).

Low-Level SDK (iii)

The iii package provides the raw Bridge class and type definitions. It handles the async event loop integration using asyncio and websockets.

Installation

pip install iii

Key Components

Basic Usage

from iii import Bridge

# Create and connect bridge
bridge = Bridge()
await bridge.connect()

# Register a function
async def my_handler(data):
    return {"result": "success"}

bridge.register_function(
    function_path="service.action",
    handler=my_handler,
    description="My custom function"
)

# Register a trigger
bridge.register_trigger(
    trigger_type="api",
    function_path="service.action",
    config={
        "api_path": "/my-endpoint",
        "http_method": "POST"
    }
)

High-Level Framework (motia)

motia builds upon iii to provide a declarative workflow definition system. It wraps raw function handlers into structured "Steps".

Installation

pip install motia

Step Architecture

A Step in Motia encapsulates configuration (Event, API, Cron) and the handler logic. The step_wrapper function automatically registers the step with the bridge and injects a FlowContext.

Context and State

The FlowContext passed to handlers provides access to:

emit
function

A function to publish events to topics.

await ctx.emit("user.created", {"id": "123"})
trigger
TriggerMetadata

Metadata about which trigger invoked the handler.

# Access trigger information
if ctx.trigger.type == "api":
    print(f"API {ctx.trigger.method} {ctx.trigger.path}")
elif ctx.trigger.type == "event":
    print(f"Event {ctx.trigger.topic}")
elif ctx.trigger.type == "cron":
    print(f"Cron {ctx.trigger.expression}")

# Access trigger index
print(f"Trigger index: {ctx.trigger.index}")

Fields: type, index, path, method, topic, expression (type-specific)

state
InternalStateManager

An internal state manager for configuration and internal data.

await ctx.state.set("config", "theme", "dark")
theme = await ctx.state.get("config", "theme")
streams
dict[str, Stream]

Access to distributed streams (get/set/delete).

todo_stream = Stream("todos")
await todo_stream.set("inbox", "item-1", {"title": "Buy milk"})
logger
Logger

Context-aware logging with trace ID.

ctx.logger.info("Processing started")
ctx.logger.error("An error occurred", exc_info=True)

Node.js Implementation

The Node.js SDK (@iii-dev/sdk) utilizes TypeScript to provide type safety and follows a similar bridge pattern but uses callbacks and hooks for integration.

Installation

npm install @iii-dev/sdk

Bridge Implementation

The Node.js Bridge class uses the ws library. It maintains queues for messages sent before the socket is open (messagesToSend) and handles automatic reconnection intervals.

import { createBridge } from '@iii-dev/sdk'

// Create bridge instance
const bridge = createBridge({
  url: 'ws://127.0.0.1:49134',
})

// Connect to engine
await bridge.connect()

// Register a function
bridge.registerFunction({
  function_path: 'service.action',
  handler: async (data) => {
    return { result: 'success' }
  },
  description: 'My custom function',
})

// Register a trigger
bridge.registerTrigger({
  trigger_type: 'api',
  function_path: 'service.action',
  config: {
    api_path: '/my-endpoint',
    http_method: 'POST',
  },
})

Conceptual Structure

// Conceptual structure based on packages/node/iii/src/bridge.ts
class Bridge {
  private ws?: WebSocket
  private functions = new Map<string, RemoteFunctionData>()
  private messagesToSend: BridgeMessage[] = []

  registerFunction(message, handler) {
    this.sendMessage(MessageType.RegisterFunction, message, true)
    this.functions.set(message.function_path, { message, handler })
  }
}

React-like Hooks

The Node ecosystem includes higher-level abstractions that mimic React hooks for defining triggers:

Stream Management

Both SDKs provide abstractions for interacting with the Engine's Stream module, which handles distributed state and grouping.

Stream Operations

Streams are accessed via the streams. namespace functions invoked over the bridge.

OperationSDK FunctionBridge TargetDescription
Getstream.get(group, id)streams.getRetrieves an item from a group
Setstream.set(group, id, val)streams.setSaves an item to a group
Deletestream.delete(group, id)streams.deleteRemoves an item
Get Groupstream.get_group(group)streams.getGroupRetrieves all items in a group

Python Stream Class

In Python, the Stream class is generic (Stream[TData]) and lazy-loads the bridge instance to perform RPC calls.

from motia import Stream

# Define stream
todo_stream = Stream("todos")

# Get item
async def get(self, group_id: str, item_id: str) -> TData | None:
    return await self._get_bridge().invoke_function(
        "streams.get",
        {
            "stream_name": self.stream_name,
            "group_id": group_id,
            "item_id": item_id,
        },
    )

Node.js Stream Usage

import { Stream } from '@iii-dev/sdk'

// Define stream
const todoStream = new Stream('todos')

// Set item
await todoStream.set('inbox', 'item-1', {
  title: 'Buy milk',
  completed: false,
})

// Get item
const item = await todoStream.get('inbox', 'item-1')

// Get all items in group
const allItems = await todoStream.getGroup('inbox')

Request and Response Models

The SDKs standardize how HTTP requests and responses are handled when using API triggers.

API Request

The ApiRequest model normalizes incoming HTTP data.

path_params
dict

Dictionary of URL path parameters.

# Route: /users/:userId/posts/:postId
# Request: GET /users/123/posts/456
req.path_params  # {"userId": "123", "postId": "456"}
query_params
dict

Dictionary of query string arguments.

# Request: GET /users?status=active&limit=10
req.query_params  # {"status": "active", "limit": "10"}
body
Any

The request payload (parsed JSON).

# POST /users with {"name": "Alice"}
req.body  # {"name": "Alice"}
headers
dict

Request headers.

req.headers.get("authorization")

API Response

The ApiResponse model dictates how handlers should return data to the engine to be sent back to the HTTP client.

status
int

HTTP status code.

return ApiResponse(status=200, body={...})
body
Any

Response payload (will be serialized to JSON).

return ApiResponse(status=200, body={"message": "Success"})
headers
dict

Response headers (optional).

return ApiResponse(
    status=200,
    body={...},
    headers={"X-Custom-Header": "value"}
)

Middleware Composition (Python)

The Motia framework includes logic to compose middleware for API steps. This allows intercepting requests before they reach the main handler.

The _compose_middleware function recursively wraps handlers, creating an execution chain where each middleware can perform actions before and after the next handler is awaited.

Example Middleware

from motia import StepConfig, ApiTrigger, ApiRequest, ApiResponse, FlowContext, step_wrapper

# Define middleware
async def auth_middleware(req: ApiRequest, ctx: FlowContext, next_handler):
    # Before handler
    token = req.headers.get("authorization")
    if not token:
        return ApiResponse(status=401, body={"error": "Unauthorized"})

    # Validate token
    user = await validate_token(token)
    if not user:
        return ApiResponse(status=401, body={"error": "Invalid token"})

    # Inject into context
    ctx.user = user

    # Call next handler
    response = await next_handler(req, ctx)

    # After handler (optional)
    response.headers["X-Request-ID"] = ctx.trace_id

    return response

# Use middleware in step
config = StepConfig(
    name="protected-route",
    triggers=[
        ApiTrigger(
            path="/protected",
            method="GET",
            middleware=[auth_middleware]
        )
    ]
)

async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    # ctx.user is available here
    return ApiResponse(
        status=200,
        body={"message": f"Hello, {ctx.user['name']}"}
    )

step_wrapper(config, __file__, handler)

Complete Examples

Python Example

from iii import Bridge

# Create bridge
bridge = Bridge()

# Register function
async def create_user(data):
    user = {"id": "123", **data}
    return {"status": 201, "body": user}

bridge.register_function(
    function_path="users.create",
    handler=create_user
)

# Register API trigger
bridge.register_trigger(
    trigger_type="api",
    function_path="users.create",
    config={
        "api_path": "/users",
        "http_method": "POST"
    }
)

# Connect and run
await bridge.connect()

Node.js Example

import { createBridge } from '@iii-dev/sdk'

const bridge = createBridge()

// Register function
bridge.registerFunction({
  function_path: 'users.create',
  handler: async (data) => {
    const user = { id: '123', ...data }
    return { status: 201, body: user }
  },
})

// Register API trigger
bridge.registerTrigger({
  trigger_type: 'api',
  function_path: 'users.create',
  config: {
    api_path: '/users',
    http_method: 'POST',
  },
})

// Connect and run
await bridge.connect()

Best Practices

Next Steps

On this page