SDK Implementation
The iii SDK is available in both Python and Node.js, providing a bridge between your application and the iii Engine via WebSocket communication.
SDK Architecture
The SDKs follow a bridge pattern where the SDK acts as a client, maintaining a persistent WebSocket connection to the engine to exchange JSON messages for function registration, trigger setup, and remote procedure calls.
Core Bridge Architecture
The Bridge is the central component in both SDKs. It manages the WebSocket lifecycle, message routing, and the registry of local functions available to the engine.
Connection Lifecycle
The Bridge establishes a WebSocket connection to the engine (defaulting to ws://127.0.0.1:49134). Upon connection, it flushes any pending registration messages (triggers, functions, services) to ensure the engine is aware of the worker's capabilities.
Message Types and Protocol
The SDKs implement specific message types to communicate with the engine. These are defined as Pydantic models in Python and TypeScript interfaces in Node.js.
| Message Type | Description | Key Fields |
|---|---|---|
RegisterFunction | Registers a callable function with the engine | function_path, description |
RegisterTrigger | Binds a specific configuration to a trigger type | trigger_type, function_path, config |
RegisterCondition | Registers a condition function for conditional trigger execution | function_path, condition_function_path |
InvokeFunction | Requests the engine to execute a function | function_path, data |
InvocationResult | Returns the result of a function execution | invocation_id, result, error |
Multi-Trigger Architecture
Steps can have multiple triggers of different types (API, Event, Cron), all routing to the same handler function. The engine assigns each trigger a unique indexed function path.
Trigger Indexing
Each trigger in a step gets a unique function path using the format: steps.{StepName}:trigger:{index}
Index: Corresponds to the trigger's position in the triggers array (0-based).
TriggerInput Normalization
All triggers normalize their input to a unified TriggerInput structure before invoking the handler:
interface TriggerInput {
trigger: TriggerMetadata
request: ApiRequest | null // Only for API triggers
data: any // Payload data
}
interface TriggerMetadata {
type: 'api' | 'event' | 'cron'
index: number
// Type-specific fields
path?: string // API only
method?: string // API only
topic?: string // Event only
expression?: string // Cron only
}Example Flow:
Benefits:
- One handler serves multiple trigger types
- Access trigger metadata via
ctx.triggerortriggerparameter - Consistent input structure across all trigger types
Condition Evaluation
Conditions are functions that run before the handler to determine if execution should proceed. They receive the same TriggerInput as the handler.
Condition Flow
Condition Function Signature
Python:
def condition_name(input: Any, ctx: FlowContext) -> bool:
# Evaluate based on input data
return input.get("amount", 0) > 1000
# Or async
async def async_condition(input: Any, ctx: FlowContext) -> bool:
user = await fetch_user(input.get("user_id"))
return user.is_verifiedTypeScript:
const conditionName = (input: any, ctx: FlowContext): boolean => {
// Evaluate based on input data
return input?.amount > 1000
}
// Or async
const asyncCondition = async (input: any, ctx: FlowContext): Promise<boolean> => {
const user = await fetchUser(input?.userId)
return user.isVerified
}Condition Registration
Conditions are registered separately from the main handler:
- Handler Registration:
steps.{StepName}:trigger:{index} - Condition Registration:
steps.{StepName}:trigger:{index}:condition
The engine evaluates the condition before invoking the handler. If the condition returns false, the handler is never called.
Short-Circuit Behavior: The first false condition stops execution immediately.
See Examples: For practical examples of multi-trigger steps and conditions in action, see Motia SDK - Multiple Triggers and Trigger Conditions.
Python Implementation
The Python implementation is divided into two layers: the low-level SDK (iii) and the high-level framework (motia).
Low-Level SDK (iii)
The iii package provides the raw Bridge class and type definitions. It handles the async event loop integration using asyncio and websockets.
Installation
pip install iiiKey Components
Basic Usage
from iii import Bridge
# Create and connect bridge
bridge = Bridge()
await bridge.connect()
# Register a function
async def my_handler(data):
return {"result": "success"}
bridge.register_function(
function_path="service.action",
handler=my_handler,
description="My custom function"
)
# Register a trigger
bridge.register_trigger(
trigger_type="api",
function_path="service.action",
config={
"api_path": "/my-endpoint",
"http_method": "POST"
}
)High-Level Framework (motia)
motia builds upon iii to provide a declarative workflow definition system. It wraps raw function handlers into structured "Steps".
Installation
pip install motiaStep Architecture
A Step in Motia encapsulates configuration (Event, API, Cron) and the handler logic. The step_wrapper function automatically registers the step with the bridge and injects a FlowContext.
Context and State
The FlowContext passed to handlers provides access to:
Metadata about which trigger invoked the handler.
# Access trigger information
if ctx.trigger.type == "api":
print(f"API {ctx.trigger.method} {ctx.trigger.path}")
elif ctx.trigger.type == "event":
print(f"Event {ctx.trigger.topic}")
elif ctx.trigger.type == "cron":
print(f"Cron {ctx.trigger.expression}")
# Access trigger index
print(f"Trigger index: {ctx.trigger.index}")Fields: type, index, path, method, topic, expression (type-specific)
An internal state manager for configuration and internal data.
await ctx.state.set("config", "theme", "dark")
theme = await ctx.state.get("config", "theme")Access to distributed streams (get/set/delete).
todo_stream = Stream("todos")
await todo_stream.set("inbox", "item-1", {"title": "Buy milk"})Context-aware logging with trace ID.
ctx.logger.info("Processing started")
ctx.logger.error("An error occurred", exc_info=True)Node.js Implementation
The Node.js SDK (@iii-dev/sdk) utilizes TypeScript to provide type safety and follows a similar bridge pattern but uses callbacks and hooks for integration.
Installation
npm install @iii-dev/sdkBridge Implementation
The Node.js Bridge class uses the ws library. It maintains queues for messages sent before the socket is open (messagesToSend) and handles automatic reconnection intervals.
import { createBridge } from '@iii-dev/sdk'
// Create bridge instance
const bridge = createBridge({
url: 'ws://127.0.0.1:49134',
})
// Connect to engine
await bridge.connect()
// Register a function
bridge.registerFunction({
function_path: 'service.action',
handler: async (data) => {
return { result: 'success' }
},
description: 'My custom function',
})
// Register a trigger
bridge.registerTrigger({
trigger_type: 'api',
function_path: 'service.action',
config: {
api_path: '/my-endpoint',
http_method: 'POST',
},
})Conceptual Structure
// Conceptual structure based on packages/node/iii/src/bridge.ts
class Bridge {
private ws?: WebSocket
private functions = new Map<string, RemoteFunctionData>()
private messagesToSend: BridgeMessage[] = []
registerFunction(message, handler) {
this.sendMessage(MessageType.RegisterFunction, message, true)
this.functions.set(message.function_path, { message, handler })
}
}React-like Hooks
The Node ecosystem includes higher-level abstractions that mimic React hooks for defining triggers:
Stream Management
Both SDKs provide abstractions for interacting with the Engine's Stream module, which handles distributed state and grouping.
Stream Operations
Streams are accessed via the streams. namespace functions invoked over the bridge.
| Operation | SDK Function | Bridge Target | Description |
|---|---|---|---|
| Get | stream.get(group, id) | streams.get | Retrieves an item from a group |
| Set | stream.set(group, id, val) | streams.set | Saves an item to a group |
| Delete | stream.delete(group, id) | streams.delete | Removes an item |
| Get Group | stream.get_group(group) | streams.getGroup | Retrieves all items in a group |
Python Stream Class
In Python, the Stream class is generic (Stream[TData]) and lazy-loads the bridge instance to perform RPC calls.
from motia import Stream
# Define stream
todo_stream = Stream("todos")
# Get item
async def get(self, group_id: str, item_id: str) -> TData | None:
return await self._get_bridge().invoke_function(
"streams.get",
{
"stream_name": self.stream_name,
"group_id": group_id,
"item_id": item_id,
},
)Node.js Stream Usage
import { Stream } from '@iii-dev/sdk'
// Define stream
const todoStream = new Stream('todos')
// Set item
await todoStream.set('inbox', 'item-1', {
title: 'Buy milk',
completed: false,
})
// Get item
const item = await todoStream.get('inbox', 'item-1')
// Get all items in group
const allItems = await todoStream.getGroup('inbox')Request and Response Models
The SDKs standardize how HTTP requests and responses are handled when using API triggers.
API Request
The ApiRequest model normalizes incoming HTTP data.
Dictionary of URL path parameters.
# Route: /users/:userId/posts/:postId
# Request: GET /users/123/posts/456
req.path_params # {"userId": "123", "postId": "456"}Dictionary of query string arguments.
# Request: GET /users?status=active&limit=10
req.query_params # {"status": "active", "limit": "10"}The request payload (parsed JSON).
# POST /users with {"name": "Alice"}
req.body # {"name": "Alice"}API Response
The ApiResponse model dictates how handlers should return data to the engine to be sent back to the HTTP client.
Response payload (will be serialized to JSON).
return ApiResponse(status=200, body={"message": "Success"})Response headers (optional).
return ApiResponse(
status=200,
body={...},
headers={"X-Custom-Header": "value"}
)Middleware Composition (Python)
The Motia framework includes logic to compose middleware for API steps. This allows intercepting requests before they reach the main handler.
The _compose_middleware function recursively wraps handlers, creating an execution chain where each middleware can perform actions before and after the next handler is awaited.
Example Middleware
from motia import StepConfig, ApiTrigger, ApiRequest, ApiResponse, FlowContext, step_wrapper
# Define middleware
async def auth_middleware(req: ApiRequest, ctx: FlowContext, next_handler):
# Before handler
token = req.headers.get("authorization")
if not token:
return ApiResponse(status=401, body={"error": "Unauthorized"})
# Validate token
user = await validate_token(token)
if not user:
return ApiResponse(status=401, body={"error": "Invalid token"})
# Inject into context
ctx.user = user
# Call next handler
response = await next_handler(req, ctx)
# After handler (optional)
response.headers["X-Request-ID"] = ctx.trace_id
return response
# Use middleware in step
config = StepConfig(
name="protected-route",
triggers=[
ApiTrigger(
path="/protected",
method="GET",
middleware=[auth_middleware]
)
]
)
async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
# ctx.user is available here
return ApiResponse(
status=200,
body={"message": f"Hello, {ctx.user['name']}"}
)
step_wrapper(config, __file__, handler)Complete Examples
Python Example
from iii import Bridge
# Create bridge
bridge = Bridge()
# Register function
async def create_user(data):
user = {"id": "123", **data}
return {"status": 201, "body": user}
bridge.register_function(
function_path="users.create",
handler=create_user
)
# Register API trigger
bridge.register_trigger(
trigger_type="api",
function_path="users.create",
config={
"api_path": "/users",
"http_method": "POST"
}
)
# Connect and run
await bridge.connect()Node.js Example
import { createBridge } from '@iii-dev/sdk'
const bridge = createBridge()
// Register function
bridge.registerFunction({
function_path: 'users.create',
handler: async (data) => {
const user = { id: '123', ...data }
return { status: 201, body: user }
},
})
// Register API trigger
bridge.registerTrigger({
trigger_type: 'api',
function_path: 'users.create',
config: {
api_path: '/users',
http_method: 'POST',
},
})
// Connect and run
await bridge.connect()