iii
Tutorials

@iii-dev/motia

Build with Motia iii

Motia is a high-level polyglot framework built on top of the iii SDK that provides type safety, declarative configuration, and structured workflows. Steps can be written in JavaScript, TypeScript, or Python.

Why Motia?

While the base iii SDK gives you full control, Motia adds:

  • Polyglot Support: Write steps in JavaScript, TypeScript, or Python
  • Type Safety: Validation and type checking for all configurations
  • Declarative Steps: Define workflows as step files
  • Auto-Discovery: CLI automatically finds and loads steps
  • Flow Context: Built-in state, logging, and event emission
  • Middleware Support: Request/response interceptors for API routes

Installation

JavaScript/TypeScript

npm install @iii-dev/motia
# or
yarn add @iii-dev/motia

Python

uv pip install iii-motia
# or
pip install iii-motia

Quick Start

1. Create a Step File

Steps are the building blocks of Motia, and can be written in JavaScript, TypeScript, or Python.

TypeScript

TypeScript Example (steps/hello.step.ts):

import type { StepConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

export const config: StepConfig = {
  name: 'hello',
  triggers: [
    {
      type: 'api',
      path: '/hello/:name',
      method: 'GET',
      responseSchema: {
        200: z.object({
          message: z.string(),
        }),
      },
    },
  ],
}

export const handler: Handlers<typeof config> = async (req, { logger }) => {
  const name = req.pathParams.name || 'World'

  logger.info(`Greeting ${name}`)

  return {
    status: 200,
    body: { message: `Hello, ${name}!` },
  }
}

Python

Python Example (steps/hello.step.py):

from motia import ApiRequest, ApiResponse, FlowContext, api

config = {
    "name": "hello",
    "description": "Return a greeting",
    "triggers": [api("GET", "/hello/:name")],
    "emits": [],
}

async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    name = req.path_params.get("name", "World")

    ctx.logger.info(f"Greeting {name}")

    return ApiResponse(status=200, body={"message": f"Hello, {name}!"})

2. Run Your Application

# Start iii engine (in another terminal)
iii

# Run Motia (defaults to ./steps)
motia run

# Optional: specify directory
motia run --dir steps

3. Test Your Endpoint

curl http://localhost:3111/hello/Alice
# {"message": "Hello, Alice!"}

Step Types

API Steps

Handle HTTP requests with validation and type safety.

TypeScript

import type { StepConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

export const config: StepConfig = {
  name: 'create-user',
  triggers: [
    {
      type: 'api',
      path: '/users',
      method: 'POST',
      bodySchema: z.object({
        name: z.string(),
        email: z.string().email(),
      }),
      responseSchema: {
        201: z.object({
          id: z.string(),
          name: z.string(),
          email: z.string(),
        }),
      },
    },
  ],
  emits: ['user.created'],
}

export const handler: Handlers<typeof config> = async (req, { emit, logger }) => {
  const user = await createUser(req.body)

  // Emit event
  await emit({ topic: 'user.created', data: user })

  logger.info('User created', { userId: user.id })

  return { status: 201, body: user }
}

Python

from motia import StepConfig, ApiTrigger, ApiRequest, ApiResponse, FlowContext, step_wrapper

config = StepConfig(
    name="create-user",
    triggers=[
        ApiTrigger(
            path="/users",
            method="POST",
            body_schema={
                "type": "object",
                "properties": {
                    "name": {"type": "string"},
                    "email": {"type": "string", "format": "email"}
                },
                "required": ["name", "email"]
            }
        )
    ],
    emits=["user.created"]
)

async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    user = await create_user(req.body)

    # Emit event
    await ctx.emit("user.created", user)

    return ApiResponse(status=201, body=user)

step_wrapper(config, __file__, handler)

Event Steps

React to events from the event bus.

TypeScript

import type { StepConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

export const config: StepConfig = {
  name: 'send-welcome-email',
  triggers: [
    {
      type: 'event',
      subscribes: ['user.created'],
      input: z.object({
        id: z.string(),
        name: z.string(),
        email: z.string(),
      }),
    },
  ],
}

export const handler: Handlers<typeof config> = async (data, { logger }) => {
  logger.info(`Sending welcome email to ${data.email}`)
  await sendEmail(data.email, 'Welcome!')
}

Python

from motia import StepConfig, EventTrigger, FlowContext, step_wrapper

config = StepConfig(
    name="send-welcome-email",
    triggers=[
        EventTrigger(subscribes=["user.created"])
    ]
)

async def handler(data, ctx: FlowContext):
    ctx.logger.info(f"Sending welcome email to {data['email']}")
    await send_email(data['email'], "Welcome!")

step_wrapper(config, __file__, handler)

Cron Steps

Run tasks on a schedule using cron expressions.

TypeScript

import type { StepConfig, Handlers } from '@iii-dev/motia'

export const config: StepConfig = {
  name: 'daily-report',
  triggers: [
    {
      type: 'cron',
      cron: '0 9 * * *', // Every day at 9 AM
    },
  ],
  emits: ['report.generated'],
}

export const handler: Handlers<typeof config> = async (data, { emit, logger }) => {
  const report = await generateDailyReport()
  await emit({ topic: 'report.generated', data: report })
  logger.info('Daily report generated')
}

Python

from motia import StepConfig, CronTrigger, FlowContext, step_wrapper

config = StepConfig(
    name="daily-report",
    triggers=[
        CronTrigger(cron="0 9 * * *")  # Every day at 9 AM
    ],
    emits=["report.generated"]
)

async def handler(data, ctx: FlowContext):
    report = await generate_daily_report()
    await ctx.emit("report.generated", report)
    ctx.logger.info("Daily report generated")

step_wrapper(config, __file__, handler)

Multiple Triggers

A single step can respond to multiple trigger types, allowing one handler to process requests from different sources.

TypeScript

import type { StepConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

export const config: StepConfig = {
  name: 'process-order',
  triggers: [
    {
      type: 'api',
      path: '/orders/manual',
      method: 'POST',
    },
    {
      type: 'event',
      subscribes: ['order.created'],
    },
  ],
  emits: ['order.processed'],
}

export const handler: Handlers<typeof config> = async (input, { trigger, logger, emit }) => {
  let orderData

  if (trigger.type === 'api') {
    logger.info('Order received via API')
    orderData = input.body
  } else if (trigger.type === 'event') {
    logger.info('Order received via event')
    orderData = input
  }

  await processOrder(orderData)
  await emit({ topic: 'order.processed', data: orderData })

  if (trigger.type === 'api') {
    return { status: 200, body: { success: true } }
  }
}

Python

from motia import StepConfig, ApiTrigger, EventTrigger, ApiRequest, ApiResponse, FlowContext, step_wrapper

config = StepConfig(
    name="process-order",
    triggers=[
        ApiTrigger(path="/orders/manual", method="POST"),
        EventTrigger(subscribes=["order.created"])
    ],
    emits=["order.processed"]
)

async def handler(input, ctx: FlowContext):
    if ctx.trigger.type == "api":
        ctx.logger.info("Order received via API")
        order_data = input.body
    elif ctx.trigger.type == "event":
        ctx.logger.info("Order received via event")
        order_data = input

    await process_order(order_data)
    await ctx.emit("order.processed", order_data)

    if ctx.trigger.type == "api":
        return ApiResponse(status=200, body={"success": True})

step_wrapper(config, __file__, handler)

Trigger Metadata: Access trigger.type, trigger.index, and type-specific fields like trigger.path, trigger.method, or trigger.topic to determine which trigger invoked the handler.

Trigger Conditions

Conditions allow you to filter trigger executions based on input data. Conditions are evaluated before the handler runs, and if they return false, the handler is skipped.

TypeScript

import type { StepConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

const isHighValue = (input: any, { trigger }: any) => {
  const amount = input?.body?.amount || input?.amount || 0
  return amount > 1000
}

export const config: StepConfig = {
  name: 'process-payment',
  triggers: [
    {
      type: 'api',
      path: '/payments',
      method: 'POST',
      condition: isHighValue,
      bodySchema: z.object({
        amount: z.number(),
        currency: z.string(),
      }),
    },
  ],
}

export const handler: Handlers<typeof config> = async (req, { logger }) => {
  logger.info(`Processing high-value payment: ${req.body.amount}`)
  await processPayment(req.body)
  return { status: 200, body: { processed: true } }
}

Python

from motia import StepConfig, EventTrigger, FlowContext, step_wrapper

def is_high_value(input, ctx: FlowContext) -> bool:
    amount = input.get("amount", 0) if isinstance(input, dict) else 0
    return amount > 1000

config = StepConfig(
    name="process-high-value-order",
    triggers=[
        EventTrigger(
            subscribes=["order.created"],
            condition=is_high_value
        )
    ]
)

async def handler(data, ctx: FlowContext):
    ctx.logger.info(f"Processing high-value order: {data['amount']}")
    await process_order(data)

step_wrapper(config, __file__, handler)

Conditions with Multiple Triggers

Each trigger in a multi-trigger step can have its own condition:

from motia import StepConfig, ApiTrigger, EventTrigger, FlowContext, step_wrapper

def is_verified_user(input, ctx: FlowContext) -> bool:
    if ctx.trigger.type == "api":
        return input.headers.get("x-verified") == "true"
    return True

def is_high_value(input, ctx: FlowContext) -> bool:
    amount = input.get("amount", 0) if isinstance(input, dict) else 0
    return amount > 1000

config = StepConfig(
    name="process-order",
    triggers=[
        ApiTrigger(
            path="/orders/manual",
            method="POST",
            condition=is_verified_user
        ),
        EventTrigger(
            subscribes=["order.created"],
            condition=is_high_value
        )
    ],
    emits=["order.processed"]
)

async def handler(input, ctx: FlowContext):
    ctx.logger.info(f"Processing order via {ctx.trigger.type}")
    await process_order(input)
    await ctx.emit("order.processed", input)

step_wrapper(config, __file__, handler)

How Conditions Work:

  • Conditions receive the same input as the handler
  • They must return a boolean (True to execute, False to skip)
  • Conditions can be sync or async functions
  • If a condition returns False, the handler is never invoked

Learn More: For technical details on trigger indexing, TriggerInput normalization, and condition evaluation flow, see SDK Implementation.

Flow Context

Every handler receives a FlowContext with runtime utilities:

Naming Conventions: TypeScript uses camelCase (traceId, pathParams) while Python uses snake_case (trace_id, path_params). The examples below show both styles.

emit
function

Emit events to the event bus.

TypeScript:

await emit({ topic: 'topic.name', data: { key: 'value' } })

Python:

await ctx.emit("topic.name", {"key": "value"})
trigger
TriggerMetadata

Metadata about which trigger invoked the handler. Useful for multi-trigger steps.

TypeScript:

if (trigger.type === 'api') {
  console.log(`API ${trigger.method} ${trigger.path}`)
} else if (trigger.type === 'event') {
  console.log(`Event ${trigger.topic}`)
}

Python:

if ctx.trigger.type == "api":
  ctx.logger.info(f"API {ctx.trigger.method} {ctx.trigger.path}")
elif ctx.trigger.type == "event":
  ctx.logger.info(f"Event {ctx.trigger.topic}")

Fields: type (api/event/cron), index (trigger position), plus type-specific fields like path, method, topic, or expression.

traceId / trace_id
string

Distributed tracing identifier for correlating logs.

TypeScript:

logger.info(`[${traceId}] Processing request`)

Python:

ctx.logger.info(f"[{ctx.trace_id}] Processing request")
state
InternalStateManager

Internal state management (uses $$internal-state stream).

TypeScript:

await state.set('config', 'theme', 'dark')
const theme = await state.get('config', 'theme')

Python:

await ctx.state.set("config", "theme", "dark")
theme = await ctx.state.get("config", "theme")
streams
Streams

Access to stream instances.

TypeScript:

await streams.todos.set('inbox', 'item-1', { title: 'Buy milk' })

Python:

todo_stream = Stream("todos")
await todo_stream.set("inbox", "item-1", {"title": "Buy milk"})
logger
Logger

Context-aware logger with trace ID.

TypeScript:

logger.info('Processing started')
logger.error('An error occurred')

Python:

ctx.logger.info("Processing started")
ctx.logger.error("An error occurred", exc_info=True)

Streams and State

Using Streams

Streams provide distributed state management:

from motia import Stream, ApiRequest, ApiResponse, FlowContext, step_wrapper

# Define stream
todo_stream = Stream("todos")

async def create_todo(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    todo_id = generate_id()
    todo = {
        "id": todo_id,
        "title": req.body["title"],
        "completed": False
    }

    # Store in stream
    await todo_stream.set("inbox", todo_id, todo)

    return ApiResponse(status=201, body=todo)

async def get_todos(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    # Get all todos in group
    todos = await todo_stream.get_group("inbox")
    return ApiResponse(status=200, body=todos)

Stream Hierarchy:

  • Stream Name: todos (top-level namespace)
  • Group ID: inbox (partition within stream)
  • Item ID: todo-123 (unique identifier)
  • Data: The actual JSON payload

Internal State

Use ctx.state for configuration and internal data:

async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    # Set state
    await ctx.state.set("user-prefs", "theme", "dark")

    # Get state
    theme = await ctx.state.get("user-prefs", "theme")

    # Get all in group
    prefs = await ctx.state.get_group("user-prefs")

    # Delete
    await ctx.state.delete("user-prefs", "theme")

Middleware

Add middleware for authentication, logging, or validation:

from motia import StepConfig, ApiTrigger, ApiRequest, ApiResponse, FlowContext, step_wrapper

# Define middleware
async def auth_middleware(req: ApiRequest, ctx: FlowContext, next_handler):
    token = req.headers.get("authorization")

    if not token:
        return ApiResponse(status=401, body={"error": "Unauthorized"})

    # Validate token
    user = await validate_token(token)
    if not user:
        return ApiResponse(status=401, body={"error": "Invalid token"})

    # Add user to context (you can modify req or ctx here)
    ctx.user = user

    # Call next handler
    return await next_handler(req, ctx)

# Use middleware in step
config = StepConfig(
    name="protected-route",
    triggers=[
        ApiTrigger(
            path="/protected",
            method="GET",
            middleware=[auth_middleware]  # Add middleware
        )
    ]
)

async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    # ctx.user is available here
    return ApiResponse(
        status=200,
        body={"message": f"Hello, {ctx.user['name']}"}
    )

step_wrapper(config, __file__, handler)

Infrastructure Configuration

Define compute resources and queue behavior for your steps:

from motia import StepConfig, EventTrigger, InfrastructureConfig, HandlerConfig, QueueConfig, FlowContext, step_wrapper

config = StepConfig(
    name="heavy-processing",
    triggers=[
        EventTrigger(subscribes=["data.uploaded"])
    ],
    infrastructure=InfrastructureConfig(
        handler=HandlerConfig(
            ram=512,      # 512 MB memory
            cpu=2,        # 2 CPU cores
            timeout=300   # 5 minute timeout
        ),
        queue=QueueConfig(
            type="fifo",                  # FIFO queue (or "standard")
            max_retries=5,                # Retry 5 times on failure
            concurrency=10,               # Process 10 messages concurrently
            backoff_type="exponential",   # "fixed" or "exponential" retry backoff
            backoff_delay_ms=1000,        # Initial backoff delay (1 second)
            visibility_timeout=60,        # 1 minute visibility timeout
            delay_seconds=10              # 10 second initial delay
        )
    )
)

async def handler(data, ctx: FlowContext):
    # Heavy processing logic
    await process_large_file(data)

step_wrapper(config, __file__, handler)

Queue Configuration Notes:

  • Queue configuration applies to event triggers only (not API or Cron)
  • Settings are per-subscription, meaning each event topic can have different queue behavior
  • backoff_type: Controls retry delay strategy - "fixed" for constant delays or "exponential" for increasing delays
  • concurrency: Maximum number of messages processed simultaneously for this subscription
  • Queue settings override adapter defaults when specified

CLI Commands

Project Structure

Organize your Motia project. Steps can use .ts, .js, or .py extensions:

TypeScript/JavaScript Project:

my-app/
├── steps/
│   ├── api/
│   │   ├── users.step.ts
│   │   └── orders.step.ts
│   ├── events/
│   │   ├── notifications.step.ts
│   │   └── analytics.step.ts
│   └── cron/
│       └── cleanup.step.ts
├── config.yaml          # Engine configuration
├── package.json
└── tsconfig.json

Python Project:

my-app/
├── steps/
│   ├── api/
│   │   ├── users.step.py
│   │   └── orders.step.py
│   ├── events/
│   │   ├── notifications.step.py
│   │   └── analytics.step.py
│   └── cron/
│       └── cleanup.step.py
├── config.yaml          # Engine configuration
├── pyproject.toml
└── uv.lock

Complete Example: Todo API

This example shows a simple Todo API with create, list, and notification functionality.

TypeScript

steps/create_todo.step.ts

import type { StepConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

const todoSchema = z.object({
  id: z.string(),
  title: z.string(),
  completed: z.boolean(),
})

export const config: StepConfig = {
  name: 'create-todo',
  triggers: [
    {
      type: 'api',
      path: '/todos',
      method: 'POST',
      bodySchema: z.object({
        title: z.string(),
      }),
      responseSchema: {
        201: todoSchema,
      },
    },
  ],
  emits: ['todo.created'],
}

export const handler: Handlers<typeof config> = async (req, { streams, emit, logger }) => {
  const todoId = `todo-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`

  const todo = {
    id: todoId,
    title: req.body.title,
    completed: false,
  }

  await streams.todos.set('default', todoId, todo)
  await emit({ topic: 'todo.created', data: todo })

  logger.info('Created todo', { todoId })

  return { status: 201, body: todo }
}

steps/list_todos.step.ts

import type { StepConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

export const config: StepConfig = {
  name: 'list-todos',
  triggers: [
    {
      type: 'api',
      path: '/todos',
      method: 'GET',
      responseSchema: {
        200: z.array(
          z.object({
            id: z.string(),
            title: z.string(),
            completed: z.boolean(),
          }),
        ),
      },
    },
  ],
}

export const handler: Handlers<typeof config> = async (req, { streams }) => {
  const todos = await streams.todos.getGroup('default')
  return { status: 200, body: todos }
}

steps/notify_todo.step.ts

import type { StepConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

export const config: StepConfig = {
  name: 'notify-todo-created',
  triggers: [
    {
      type: 'event',
      subscribes: ['todo.created'],
      input: z.object({
        id: z.string(),
        title: z.string(),
        completed: z.boolean(),
      }),
    },
  ],
}

export const handler: Handlers<typeof config> = async (data, { logger }) => {
  logger.info('New todo created', { title: data.title })
  // Send notification logic here
}

Python

steps/create_todo.step.py

from motia import StepConfig, ApiTrigger, ApiRequest, ApiResponse, FlowContext, Stream, step_wrapper

todo_stream = Stream("todos")

config = StepConfig(
    name="create-todo",
    triggers=[
        ApiTrigger(path="/todos", method="POST")
    ],
    emits=["todo.created"]
)

async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    import uuid

    todo_id = str(uuid.uuid4())
    todo = {
        "id": todo_id,
        "title": req.body["title"],
        "completed": False
    }

    await todo_stream.set("default", todo_id, todo)
    await ctx.emit("todo.created", todo)

    ctx.logger.info(f"Created todo: {todo_id}")

    return ApiResponse(status=201, body=todo)

step_wrapper(config, __file__, handler)

steps/list_todos.step.py

from motia import StepConfig, ApiTrigger, ApiRequest, ApiResponse, FlowContext, Stream, step_wrapper

todo_stream = Stream("todos")

config = StepConfig(
    name="list-todos",
    triggers=[
        ApiTrigger(path="/todos", method="GET")
    ]
)

async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    todos = await todo_stream.get_group("default")
    return ApiResponse(status=200, body=todos)

step_wrapper(config, __file__, handler)

steps/notify_todo.step.py

from motia import StepConfig, EventTrigger, FlowContext, step_wrapper

config = StepConfig(
    name="notify-todo-created",
    triggers=[
        EventTrigger(subscribes=["todo.created"])
    ]
)

async def handler(data, ctx: FlowContext):
    ctx.logger.info(f"New todo created: {data['title']}")
    # Send notification logic here

step_wrapper(config, __file__, handler)

Best Practices

Next Steps

On this page