iii
Tutorials

Motia Framework

Motia is a high-level polyglot framework built on top of the iii SDK that provides type safety, declarative configuration, and structured workflows. Steps can be written in JavaScript, TypeScript, or Python.

Why Motia?

While the base iii SDK gives you full control, Motia adds:

  • Polyglot Support: Write steps in JavaScript, TypeScript, or Python
  • Type Safety: Validation and type checking for all configurations
  • Declarative Steps: Define workflows as step files
  • Auto-Discovery: CLI automatically finds and loads steps
  • Flow Context: Built-in state, logging, and event emission
  • Middleware Support: Request/response interceptors for API routes

Installation

JavaScript/TypeScript

npm install @iii-dev/motia
# or
yarn add @iii-dev/motia

Python

pip install motia

Quick Start

1. Create a Step File

Steps are the building blocks of Motia, and can be written in JavaScript, TypeScript, or Python.

TypeScript

TypeScript Example (steps/hello.step.ts):

import type { ApiRouteConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

export const config: ApiRouteConfig = {
  type: 'api',
  name: 'hello',
  path: '/hello/:name',
  method: 'GET',
  responseSchema: {
    200: z.object({
      message: z.string(),
    }),
  },
}

export const handler: Handlers<typeof config> = async (req, { logger }) => {
  const name = req.pathParams.name || 'World'

  logger.info(`Greeting ${name}`)

  return {
    status: 200,
    body: { message: `Hello, ${name}!` },
  }
}

Python

Python Example (steps/hello.step.py):

from motia import ApiRouteConfig, ApiRequest, ApiResponse, FlowContext, step_wrapper

# Define step configuration
config = ApiRouteConfig(
    type="api",
    name="hello",
    path="/hello/:name",
    method="GET"
)

# Define handler
async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    name = req.path_params.get("name", "World")

    ctx.logger.info(f"Greeting {name}")

    return ApiResponse(
        status=200,
        body={"message": f"Hello, {name}!"}
    )

# Register step
step_wrapper(config, __file__, handler)

2. Run Your Application

# Start iii engine (in another terminal)
iii

# Run Motia
motia run --dir steps

3. Test Your Endpoint

curl http://localhost:3111/hello/Alice
# {"message": "Hello, Alice!"}

Step Types

API Steps

Handle HTTP requests with validation and type safety.

TypeScript

import type { ApiRouteConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

export const config: ApiRouteConfig = {
  type: 'api',
  name: 'create-user',
  path: '/users',
  method: 'POST',
  bodySchema: z.object({
    name: z.string(),
    email: z.string().email(),
  }),
  responseSchema: {
    201: z.object({
      id: z.string(),
      name: z.string(),
      email: z.string(),
    }),
  },
  emits: ['user-created'],
}

export const handler: Handlers<typeof config> = async (req, { emit, logger }) => {
  const user = await createUser(req.body)

  // Emit event
  await emit({ topic: 'user-created', data: user })

  logger.info('User created', { userId: user.id })

  return { status: 201, body: user }
}

Python

from motia import ApiRouteConfig, ApiRequest, ApiResponse, FlowContext, step_wrapper

config = ApiRouteConfig(
    type="api",
    name="create-user",
    path="/users",
    method="POST",
    body_schema={
        "type": "object",
        "properties": {
            "name": {"type": "string"},
            "email": {"type": "string", "format": "email"}
        },
        "required": ["name", "email"]
    }
)

async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    user = await create_user(req.body)

    # Emit event
    await ctx.emit("user.created", user)

    return ApiResponse(status=201, body=user)

step_wrapper(config, __file__, handler)

Event Steps

React to events from the event bus.

TypeScript

import type { EventConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

export const config: EventConfig = {
  type: 'event',
  name: 'send-welcome-email',
  subscribes: ['user-created'],
  input: z.object({
    id: z.string(),
    name: z.string(),
    email: z.string(),
  }),
}

export const handler: Handlers<typeof config> = async (data, { logger }) => {
  logger.info(`Sending welcome email to ${data.email}`)
  await sendEmail(data.email, 'Welcome!')
}

Python

from motia import EventConfig, FlowContext, step_wrapper

config = EventConfig(
    type="event",
    name="send-welcome-email",
    subscribes=["user.created"]
)

async def handler(data, ctx: FlowContext):
    ctx.logger.info(f"Sending welcome email to {data['email']}")
    await send_email(data['email'], "Welcome!")

step_wrapper(config, __file__, handler)

Cron Steps

Run tasks on a schedule using cron expressions.

TypeScript

import type { CronConfig, Handlers } from '@iii-dev/motia'

export const config: CronConfig = {
  type: 'cron',
  name: 'daily-report',
  cron: '0 9 * * *', // Every day at 9 AM
  emits: ['report-generated'],
}

export const handler: Handlers<typeof config> = async ({ emit, logger }) => {
  const report = await generateDailyReport()
  await emit({ topic: 'report-generated', data: report })
  logger.info('Daily report generated')
}

Python

from motia import CronConfig, FlowContext, step_wrapper

config = CronConfig(
    type="cron",
    name="daily-report",
    cron="0 9 * * *",  # Every day at 9 AM
    emits=["report.generated"]
)

async def handler(data, ctx: FlowContext):
    report = await generate_daily_report()
    await ctx.emit("report.generated", report)
    ctx.logger.info("Daily report generated")

step_wrapper(config, __file__, handler)

Flow Context

Every handler receives a FlowContext with runtime utilities:

Naming Conventions: TypeScript uses camelCase (traceId, pathParams) while Python uses snake_case (trace_id, path_params). The examples below show both styles.

emit
function

Emit events to the event bus.

TypeScript:

await emit({ topic: 'topic-name', data: { key: 'value' } })

Python:

await ctx.emit("topic.name", {"key": "value"})
traceId / trace_id
string

Distributed tracing identifier for correlating logs.

TypeScript:

logger.info(`[${traceId}] Processing request`)

Python:

ctx.logger.info(f"[{ctx.trace_id}] Processing request")
state
InternalStateManager

Internal state management (uses $$internal-state stream).

TypeScript:

await state.set('config', 'theme', 'dark')
const theme = await state.get('config', 'theme')

Python:

await ctx.state.set("config", "theme", "dark")
theme = await ctx.state.get("config", "theme")
streams
Streams

Access to stream instances.

TypeScript:

await streams.todos.set('inbox', 'item-1', { title: 'Buy milk' })

Python:

todo_stream = Stream("todos")
await todo_stream.set("inbox", "item-1", {"title": "Buy milk"})
logger
Logger

Context-aware logger with trace ID.

TypeScript:

logger.info('Processing started')
logger.error('An error occurred')

Python:

ctx.logger.info("Processing started")
ctx.logger.error("An error occurred", exc_info=True)

Streams and State

Using Streams

Streams provide distributed state management:

from motia import Stream, ApiRequest, ApiResponse, FlowContext, step_wrapper

# Define stream
todo_stream = Stream("todos")

async def create_todo(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    todo_id = generate_id()
    todo = {
        "id": todo_id,
        "title": req.body["title"],
        "completed": False
    }

    # Store in stream
    await todo_stream.set("inbox", todo_id, todo)

    return ApiResponse(status=201, body=todo)

async def get_todos(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    # Get all todos in group
    todos = await todo_stream.get_group("inbox")
    return ApiResponse(status=200, body=todos)

Stream Hierarchy:

  • Stream Name: todos (top-level namespace)
  • Group ID: inbox (partition within stream)
  • Item ID: todo-123 (unique identifier)
  • Data: The actual JSON payload

Internal State

Use ctx.state for configuration and internal data:

async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    # Set state
    await ctx.state.set("user-prefs", "theme", "dark")

    # Get state
    theme = await ctx.state.get("user-prefs", "theme")

    # Get all in group
    prefs = await ctx.state.get_group("user-prefs")

    # Delete
    await ctx.state.delete("user-prefs", "theme")

Middleware

Add middleware for authentication, logging, or validation:

from motia import ApiRouteConfig, ApiRequest, ApiResponse, FlowContext, step_wrapper

# Define middleware
async def auth_middleware(req: ApiRequest, ctx: FlowContext, next_handler):
    token = req.headers.get("authorization")

    if not token:
        return ApiResponse(status=401, body={"error": "Unauthorized"})

    # Validate token
    user = await validate_token(token)
    if not user:
        return ApiResponse(status=401, body={"error": "Invalid token"})

    # Add user to context (you can modify req or ctx here)
    ctx.user = user

    # Call next handler
    return await next_handler(req, ctx)

# Use middleware in step
config = ApiRouteConfig(
    type="api",
    name="protected-route",
    path="/protected",
    method="GET",
    middleware=[auth_middleware]  # Add middleware
)

async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    # ctx.user is available here
    return ApiResponse(
        status=200,
        body={"message": f"Hello, {ctx.user['name']}"}
    )

step_wrapper(config, __file__, handler)

Infrastructure Configuration

Define compute resources and queue behavior for your steps:

from motia import EventConfig, InfrastructureConfig, HandlerConfig, QueueConfig, step_wrapper

config = EventConfig(
    type="event",
    name="heavy-processing",
    subscribes=["data.uploaded"],
    infrastructure=InfrastructureConfig(
        handler=HandlerConfig(
            ram=512,      # 512 MB memory
            cpu=2,        # 2 CPU cores
            timeout=300   # 5 minute timeout
        ),
        queue=QueueConfig(
            type="fifo",            # FIFO queue
            max_retries=5,          # Retry 5 times
            visibility_timeout=60,  # 1 minute visibility
            delay_seconds=10        # 10 second delay
        )
    )
)

async def handler(data, ctx: FlowContext):
    # Heavy processing logic
    await process_large_file(data)

step_wrapper(config, __file__, handler)

CLI Commands

Project Structure

Organize your Motia project. Steps can use .ts, .js, or .py extensions:

TypeScript/JavaScript Project:

my-app/
├── steps/
│   ├── api/
│   │   ├── users.step.ts
│   │   └── orders.step.ts
│   ├── events/
│   │   ├── notifications.step.ts
│   │   └── analytics.step.ts
│   └── cron/
│       └── cleanup.step.ts
├── config.yaml          # Engine configuration
├── package.json
└── tsconfig.json

Python Project:

my-app/
├── steps/
│   ├── api/
│   │   ├── users.step.py
│   │   └── orders.step.py
│   ├── events/
│   │   ├── notifications.step.py
│   │   └── analytics.step.py
│   └── cron/
│       └── cleanup.step.py
├── config.yaml          # Engine configuration
└── requirements.txt

Complete Example: Todo API

This example shows a simple Todo API with create, list, and notification functionality.

TypeScript

steps/create_todo.step.ts

import type { ApiRouteConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

const todoSchema = z.object({
  id: z.string(),
  title: z.string(),
  completed: z.boolean(),
})

export const config: ApiRouteConfig = {
  type: 'api',
  name: 'create-todo',
  path: '/todos',
  method: 'POST',
  bodySchema: z.object({
    title: z.string(),
  }),
  responseSchema: {
    201: todoSchema,
  },
  emits: ['todo-created'],
}

export const handler: Handlers<typeof config> = async (req, { streams, emit, logger }) => {
  const todoId = `todo-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`

  const todo = {
    id: todoId,
    title: req.body.title,
    completed: false,
  }

  await streams.todos.set('default', todoId, todo)
  await emit({ topic: 'todo-created', data: todo })

  logger.info('Created todo', { todoId })

  return { status: 201, body: todo }
}

steps/list_todos.step.ts

import type { ApiRouteConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

export const config: ApiRouteConfig = {
  type: 'api',
  name: 'list-todos',
  path: '/todos',
  method: 'GET',
  responseSchema: {
    200: z.array(
      z.object({
        id: z.string(),
        title: z.string(),
        completed: z.boolean(),
      }),
    ),
  },
}

export const handler: Handlers<typeof config> = async (req, { streams }) => {
  const todos = await streams.todos.getGroup('default')
  return { status: 200, body: todos }
}

steps/notify_todo.step.ts

import type { EventConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'

export const config: EventConfig = {
  type: 'event',
  name: 'notify-todo-created',
  subscribes: ['todo-created'],
  input: z.object({
    id: z.string(),
    title: z.string(),
    completed: z.boolean(),
  }),
}

export const handler: Handlers<typeof config> = async (data, { logger }) => {
  logger.info('New todo created', { title: data.title })
  // Send notification logic here
}

Python

steps/create_todo.step.py

from motia import ApiRouteConfig, ApiRequest, ApiResponse, FlowContext, Stream, step_wrapper

todo_stream = Stream("todos")

config = ApiRouteConfig(
    type="api",
    name="create-todo",
    path="/todos",
    method="POST"
)

async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    import uuid

    todo_id = str(uuid.uuid4())
    todo = {
        "id": todo_id,
        "title": req.body["title"],
        "completed": False
    }

    await todo_stream.set("default", todo_id, todo)
    await ctx.emit("todo.created", todo)

    ctx.logger.info(f"Created todo: {todo_id}")

    return ApiResponse(status=201, body=todo)

step_wrapper(config, __file__, handler)

steps/list_todos.step.py

from motia import ApiRouteConfig, ApiRequest, ApiResponse, FlowContext, Stream, step_wrapper

todo_stream = Stream("todos")

config = ApiRouteConfig(
    type="api",
    name="list-todos",
    path="/todos",
    method="GET"
)

async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
    todos = await todo_stream.get_group("default")
    return ApiResponse(status=200, body=todos)

step_wrapper(config, __file__, handler)

steps/notify_todo.step.py

from motia import EventConfig, FlowContext, step_wrapper

config = EventConfig(
    type="event",
    name="notify-todo-created",
    subscribes=["todo.created"]
)

async def handler(data, ctx: FlowContext):
    ctx.logger.info(f"New todo created: {data['title']}")
    # Send notification logic here

step_wrapper(config, __file__, handler)

Best Practices

Next Steps

On this page