Motia Framework
Motia is a high-level polyglot framework built on top of the iii SDK that provides type safety, declarative configuration, and structured workflows. Steps can be written in JavaScript, TypeScript, or Python.
Why Motia?
While the base iii SDK gives you full control, Motia adds:
- Polyglot Support: Write steps in JavaScript, TypeScript, or Python
- Type Safety: Validation and type checking for all configurations
- Declarative Steps: Define workflows as step files
- Auto-Discovery: CLI automatically finds and loads steps
- Flow Context: Built-in state, logging, and event emission
- Middleware Support: Request/response interceptors for API routes
Installation
JavaScript/TypeScript
npm install @iii-dev/motia
# or
yarn add @iii-dev/motiaPython
pip install motiaQuick Start
1. Create a Step File
Steps are the building blocks of Motia, and can be written in JavaScript, TypeScript, or Python.
TypeScript
TypeScript Example (steps/hello.step.ts):
import type { ApiRouteConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'
export const config: ApiRouteConfig = {
type: 'api',
name: 'hello',
path: '/hello/:name',
method: 'GET',
responseSchema: {
200: z.object({
message: z.string(),
}),
},
}
export const handler: Handlers<typeof config> = async (req, { logger }) => {
const name = req.pathParams.name || 'World'
logger.info(`Greeting ${name}`)
return {
status: 200,
body: { message: `Hello, ${name}!` },
}
}Python
Python Example (steps/hello.step.py):
from motia import ApiRouteConfig, ApiRequest, ApiResponse, FlowContext, step_wrapper
# Define step configuration
config = ApiRouteConfig(
type="api",
name="hello",
path="/hello/:name",
method="GET"
)
# Define handler
async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
name = req.path_params.get("name", "World")
ctx.logger.info(f"Greeting {name}")
return ApiResponse(
status=200,
body={"message": f"Hello, {name}!"}
)
# Register step
step_wrapper(config, __file__, handler)2. Run Your Application
# Start iii engine (in another terminal)
iii
# Run Motia
motia run --dir steps3. Test Your Endpoint
curl http://localhost:3111/hello/Alice
# {"message": "Hello, Alice!"}Step Types
API Steps
Handle HTTP requests with validation and type safety.
TypeScript
import type { ApiRouteConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'
export const config: ApiRouteConfig = {
type: 'api',
name: 'create-user',
path: '/users',
method: 'POST',
bodySchema: z.object({
name: z.string(),
email: z.string().email(),
}),
responseSchema: {
201: z.object({
id: z.string(),
name: z.string(),
email: z.string(),
}),
},
emits: ['user-created'],
}
export const handler: Handlers<typeof config> = async (req, { emit, logger }) => {
const user = await createUser(req.body)
// Emit event
await emit({ topic: 'user-created', data: user })
logger.info('User created', { userId: user.id })
return { status: 201, body: user }
}Python
from motia import ApiRouteConfig, ApiRequest, ApiResponse, FlowContext, step_wrapper
config = ApiRouteConfig(
type="api",
name="create-user",
path="/users",
method="POST",
body_schema={
"type": "object",
"properties": {
"name": {"type": "string"},
"email": {"type": "string", "format": "email"}
},
"required": ["name", "email"]
}
)
async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
user = await create_user(req.body)
# Emit event
await ctx.emit("user.created", user)
return ApiResponse(status=201, body=user)
step_wrapper(config, __file__, handler)Event Steps
React to events from the event bus.
TypeScript
import type { EventConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'
export const config: EventConfig = {
type: 'event',
name: 'send-welcome-email',
subscribes: ['user-created'],
input: z.object({
id: z.string(),
name: z.string(),
email: z.string(),
}),
}
export const handler: Handlers<typeof config> = async (data, { logger }) => {
logger.info(`Sending welcome email to ${data.email}`)
await sendEmail(data.email, 'Welcome!')
}Python
from motia import EventConfig, FlowContext, step_wrapper
config = EventConfig(
type="event",
name="send-welcome-email",
subscribes=["user.created"]
)
async def handler(data, ctx: FlowContext):
ctx.logger.info(f"Sending welcome email to {data['email']}")
await send_email(data['email'], "Welcome!")
step_wrapper(config, __file__, handler)Cron Steps
Run tasks on a schedule using cron expressions.
TypeScript
import type { CronConfig, Handlers } from '@iii-dev/motia'
export const config: CronConfig = {
type: 'cron',
name: 'daily-report',
cron: '0 9 * * *', // Every day at 9 AM
emits: ['report-generated'],
}
export const handler: Handlers<typeof config> = async ({ emit, logger }) => {
const report = await generateDailyReport()
await emit({ topic: 'report-generated', data: report })
logger.info('Daily report generated')
}Python
from motia import CronConfig, FlowContext, step_wrapper
config = CronConfig(
type="cron",
name="daily-report",
cron="0 9 * * *", # Every day at 9 AM
emits=["report.generated"]
)
async def handler(data, ctx: FlowContext):
report = await generate_daily_report()
await ctx.emit("report.generated", report)
ctx.logger.info("Daily report generated")
step_wrapper(config, __file__, handler)Flow Context
Every handler receives a FlowContext with runtime utilities:
Naming Conventions: TypeScript uses camelCase (
traceId,pathParams) while Python uses snake_case (trace_id,path_params). The examples below show both styles.
Emit events to the event bus.
TypeScript:
await emit({ topic: 'topic-name', data: { key: 'value' } })Python:
await ctx.emit("topic.name", {"key": "value"})Distributed tracing identifier for correlating logs.
TypeScript:
logger.info(`[${traceId}] Processing request`)Python:
ctx.logger.info(f"[{ctx.trace_id}] Processing request")Internal state management (uses $$internal-state stream).
TypeScript:
await state.set('config', 'theme', 'dark')
const theme = await state.get('config', 'theme')Python:
await ctx.state.set("config", "theme", "dark")
theme = await ctx.state.get("config", "theme")Access to stream instances.
TypeScript:
await streams.todos.set('inbox', 'item-1', { title: 'Buy milk' })Python:
todo_stream = Stream("todos")
await todo_stream.set("inbox", "item-1", {"title": "Buy milk"})Context-aware logger with trace ID.
TypeScript:
logger.info('Processing started')
logger.error('An error occurred')Python:
ctx.logger.info("Processing started")
ctx.logger.error("An error occurred", exc_info=True)Streams and State
Using Streams
Streams provide distributed state management:
from motia import Stream, ApiRequest, ApiResponse, FlowContext, step_wrapper
# Define stream
todo_stream = Stream("todos")
async def create_todo(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
todo_id = generate_id()
todo = {
"id": todo_id,
"title": req.body["title"],
"completed": False
}
# Store in stream
await todo_stream.set("inbox", todo_id, todo)
return ApiResponse(status=201, body=todo)
async def get_todos(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
# Get all todos in group
todos = await todo_stream.get_group("inbox")
return ApiResponse(status=200, body=todos)Stream Hierarchy:
- Stream Name:
todos(top-level namespace) - Group ID:
inbox(partition within stream) - Item ID:
todo-123(unique identifier) - Data: The actual JSON payload
Internal State
Use ctx.state for configuration and internal data:
async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
# Set state
await ctx.state.set("user-prefs", "theme", "dark")
# Get state
theme = await ctx.state.get("user-prefs", "theme")
# Get all in group
prefs = await ctx.state.get_group("user-prefs")
# Delete
await ctx.state.delete("user-prefs", "theme")Middleware
Add middleware for authentication, logging, or validation:
from motia import ApiRouteConfig, ApiRequest, ApiResponse, FlowContext, step_wrapper
# Define middleware
async def auth_middleware(req: ApiRequest, ctx: FlowContext, next_handler):
token = req.headers.get("authorization")
if not token:
return ApiResponse(status=401, body={"error": "Unauthorized"})
# Validate token
user = await validate_token(token)
if not user:
return ApiResponse(status=401, body={"error": "Invalid token"})
# Add user to context (you can modify req or ctx here)
ctx.user = user
# Call next handler
return await next_handler(req, ctx)
# Use middleware in step
config = ApiRouteConfig(
type="api",
name="protected-route",
path="/protected",
method="GET",
middleware=[auth_middleware] # Add middleware
)
async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
# ctx.user is available here
return ApiResponse(
status=200,
body={"message": f"Hello, {ctx.user['name']}"}
)
step_wrapper(config, __file__, handler)Infrastructure Configuration
Define compute resources and queue behavior for your steps:
from motia import EventConfig, InfrastructureConfig, HandlerConfig, QueueConfig, step_wrapper
config = EventConfig(
type="event",
name="heavy-processing",
subscribes=["data.uploaded"],
infrastructure=InfrastructureConfig(
handler=HandlerConfig(
ram=512, # 512 MB memory
cpu=2, # 2 CPU cores
timeout=300 # 5 minute timeout
),
queue=QueueConfig(
type="fifo", # FIFO queue
max_retries=5, # Retry 5 times
visibility_timeout=60, # 1 minute visibility
delay_seconds=10 # 10 second delay
)
)
)
async def handler(data, ctx: FlowContext):
# Heavy processing logic
await process_large_file(data)
step_wrapper(config, __file__, handler)CLI Commands
Project Structure
Organize your Motia project. Steps can use .ts, .js, or .py extensions:
TypeScript/JavaScript Project:
my-app/
├── steps/
│ ├── api/
│ │ ├── users.step.ts
│ │ └── orders.step.ts
│ ├── events/
│ │ ├── notifications.step.ts
│ │ └── analytics.step.ts
│ └── cron/
│ └── cleanup.step.ts
├── config.yaml # Engine configuration
├── package.json
└── tsconfig.jsonPython Project:
my-app/
├── steps/
│ ├── api/
│ │ ├── users.step.py
│ │ └── orders.step.py
│ ├── events/
│ │ ├── notifications.step.py
│ │ └── analytics.step.py
│ └── cron/
│ └── cleanup.step.py
├── config.yaml # Engine configuration
└── requirements.txtComplete Example: Todo API
This example shows a simple Todo API with create, list, and notification functionality.
TypeScript
steps/create_todo.step.ts
import type { ApiRouteConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'
const todoSchema = z.object({
id: z.string(),
title: z.string(),
completed: z.boolean(),
})
export const config: ApiRouteConfig = {
type: 'api',
name: 'create-todo',
path: '/todos',
method: 'POST',
bodySchema: z.object({
title: z.string(),
}),
responseSchema: {
201: todoSchema,
},
emits: ['todo-created'],
}
export const handler: Handlers<typeof config> = async (req, { streams, emit, logger }) => {
const todoId = `todo-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`
const todo = {
id: todoId,
title: req.body.title,
completed: false,
}
await streams.todos.set('default', todoId, todo)
await emit({ topic: 'todo-created', data: todo })
logger.info('Created todo', { todoId })
return { status: 201, body: todo }
}steps/list_todos.step.ts
import type { ApiRouteConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'
export const config: ApiRouteConfig = {
type: 'api',
name: 'list-todos',
path: '/todos',
method: 'GET',
responseSchema: {
200: z.array(
z.object({
id: z.string(),
title: z.string(),
completed: z.boolean(),
}),
),
},
}
export const handler: Handlers<typeof config> = async (req, { streams }) => {
const todos = await streams.todos.getGroup('default')
return { status: 200, body: todos }
}steps/notify_todo.step.ts
import type { EventConfig, Handlers } from '@iii-dev/motia'
import { z } from 'zod'
export const config: EventConfig = {
type: 'event',
name: 'notify-todo-created',
subscribes: ['todo-created'],
input: z.object({
id: z.string(),
title: z.string(),
completed: z.boolean(),
}),
}
export const handler: Handlers<typeof config> = async (data, { logger }) => {
logger.info('New todo created', { title: data.title })
// Send notification logic here
}Python
steps/create_todo.step.py
from motia import ApiRouteConfig, ApiRequest, ApiResponse, FlowContext, Stream, step_wrapper
todo_stream = Stream("todos")
config = ApiRouteConfig(
type="api",
name="create-todo",
path="/todos",
method="POST"
)
async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
import uuid
todo_id = str(uuid.uuid4())
todo = {
"id": todo_id,
"title": req.body["title"],
"completed": False
}
await todo_stream.set("default", todo_id, todo)
await ctx.emit("todo.created", todo)
ctx.logger.info(f"Created todo: {todo_id}")
return ApiResponse(status=201, body=todo)
step_wrapper(config, __file__, handler)steps/list_todos.step.py
from motia import ApiRouteConfig, ApiRequest, ApiResponse, FlowContext, Stream, step_wrapper
todo_stream = Stream("todos")
config = ApiRouteConfig(
type="api",
name="list-todos",
path="/todos",
method="GET"
)
async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse:
todos = await todo_stream.get_group("default")
return ApiResponse(status=200, body=todos)
step_wrapper(config, __file__, handler)steps/notify_todo.step.py
from motia import EventConfig, FlowContext, step_wrapper
config = EventConfig(
type="event",
name="notify-todo-created",
subscribes=["todo.created"]
)
async def handler(data, ctx: FlowContext):
ctx.logger.info(f"New todo created: {data['title']}")
# Send notification logic here
step_wrapper(config, __file__, handler)