Cron Jobs
Schedule recurring tasks with cron expressions and fan out work through the queue.
The cron trigger type fires a registered function on a schedule. The handler receives {trigger, job_id, scheduled_time, actual_time} as its payload. Work is typically fanned out by calling enqueue to downstream queue handlers.
Minimal cron function
import { registerWorker, Logger, TriggerAction } from 'iii-sdk'
const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')
iii.registerFunction(
{ id: 'cron.periodic_job', description: 'Fires on schedule and enqueues work' },
async () => {
const logger = new Logger()
logger.info('Periodic job fired')
await iii.trigger({
function_id: 'enqueue',
payload: {
topic: 'job.tick',
data: { firedAt: new Date().toISOString(), message: 'Periodic job executed' },
},
action: TriggerAction.Void(),
})
},
)
iii.registerTrigger({
type: 'cron',
function_id: 'cron.periodic_job',
config: { expression: '* * * * *' }, // every minute
})from datetime import datetime, timezone
from iii import register_worker, InitOptions, Logger, TriggerAction
iii = register_worker(address="ws://localhost:49134", options=InitOptions(worker_name="cron-worker"))
def periodic_job(_data) -> None:
logger = Logger()
logger.info("Periodic job fired")
iii.trigger({
"function_id": "enqueue",
"payload": {
"topic": "job.tick",
"data": {
"firedAt": datetime.now(timezone.utc).isoformat(),
"message": "Periodic job executed",
},
},
"action": TriggerAction.Void(),
})
iii.register_function({"id": "cron.periodic_job"}, periodic_job)
iii.register_trigger({
"type": "cron",
"function_id": "cron.periodic_job",
"config": {"expression": "* * * * *"}, # every minute
})use iii_sdk::{register_worker, InitOptions, Logger, TriggerRequest, TriggerAction, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;
let iii = register_worker("ws://127.0.0.1:49134", InitOptions::default());
iii.register_function(RegisterFunctionMessage { id: "cron.periodic_job".into(), ..Default::default() }, |_input| async move {
let logger = Logger();
logger.info("Periodic job fired", None);
iii.trigger(TriggerRequest::new("enqueue", json!({
"topic": "job.tick",
"data": {
"firedAt": chrono::Utc::now().to_rfc3339(),
"message": "Periodic job executed",
},
})).action(TriggerAction::void())).await?;
Ok(json!(null))
});
iii.register_trigger(RegisterTriggerInput { type_: "cron".into(), function_id: "cron.periodic_job".into(), config: json!({
"expression": "* * * * *",
}), ..Default::default() })?;Downstream queue handler
The function that consumes the cron's emitted event:
iii.registerFunction(
{ id: 'job.handle_tick', description: 'Processes the periodic job event' },
async (data: { firedAt: string; message: string }) => {
const logger = new Logger()
logger.info('Periodic job processed', { firedAt: data.firedAt, message: data.message })
},
)
iii.registerTrigger({
type: 'queue',
function_id: 'job.handle_tick',
config: { topic: 'job.tick' },
})from iii import Logger
def handle_tick(data: dict) -> None:
logger = Logger()
logger.info("Periodic job processed", {
"firedAt": data.get("firedAt"),
"message": data.get("message"),
})
iii.register_function({"id": "job.handle_tick"}, handle_tick)
iii.register_trigger({"type": "queue", "function_id": "job.handle_tick", "config": {"topic": "job.tick"}})use iii_sdk::{Logger, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;
iii.register_function(RegisterFunctionMessage { id: "job.handle_tick".into(), ..Default::default() }, |input| async move {
let logger = Logger();
logger.info("Periodic job processed", Some(input));
Ok(json!(null))
});
iii.register_trigger(RegisterTriggerInput { type_: "queue".into(), function_id: "job.handle_tick".into(), config: json!({ "topic": "job.tick" }), ..Default::default() })?;State sweep cron
A common pattern: read all records in a state scope, filter them by a business rule, and emit events for those that need action.
iii.registerFunction(
{ id: 'cron.orders_sweep', description: 'Checks for overdue orders every 5 minutes' },
async () => {
const logger = new Logger()
const orders = await iii.trigger<{
id: string
shipDate: string
complete: boolean
status: string
}[]>({ function_id: 'state::list', payload: { scope: 'orders' } })
let swept = 0
for (const order of orders ?? []) {
if (!order.complete && new Date() > new Date(order.shipDate)) {
logger.warn('Order overdue', { orderId: order.id, shipDate: order.shipDate })
await iii.trigger({
function_id: 'enqueue',
payload: {
topic: 'notification',
data: {
orderId: order.id,
templateId: 'order-audit-warning',
status: order.status,
shipDate: order.shipDate,
},
},
action: TriggerAction.Void(),
})
swept++
}
}
logger.info('Sweep complete', { checked: orders?.length ?? 0, swept })
},
)
iii.registerTrigger({
type: 'cron',
function_id: 'cron.orders_sweep',
config: { expression: '*/5 * * * *' },
})from datetime import datetime, timezone
from iii import Logger, TriggerAction
def orders_sweep(_data) -> None:
logger = Logger()
orders = iii.trigger({"function_id": "state::list", "payload": {"scope": "orders"}}) or []
swept = 0
for order in orders:
complete = order.get("complete", False)
ship_date_str = order.get("shipDate") or order.get("ship_date", "2099-01-01T00:00:00Z")
try:
ship_date = datetime.fromisoformat(ship_date_str.replace("Z", "+00:00"))
except ValueError:
continue
if not complete and datetime.now(timezone.utc) > ship_date:
logger.warn("Order overdue", {"orderId": order.get("id")})
iii.trigger({
"function_id": "enqueue",
"payload": {
"topic": "notification",
"data": {
"orderId": order.get("id"),
"templateId": "order-audit-warning",
"status": order.get("status"),
},
},
"action": TriggerAction.Void(),
})
swept += 1
logger.info("Sweep complete", {"checked": len(orders), "swept": swept})
iii.register_function({"id": "cron.orders_sweep"}, orders_sweep)
iii.register_trigger({"type": "cron", "function_id": "cron.orders_sweep", "config": {"expression": "*/5 * * * *"}})use iii_sdk::{Logger, TriggerRequest, TriggerAction, RegisterFunctionMessage, RegisterTriggerInput};
use serde_json::json;
iii.register_function(RegisterFunctionMessage { id: "cron.orders_sweep".into(), ..Default::default() }, |_input| async move {
let logger = Logger();
let orders_val = iii.trigger(TriggerRequest::new("state::list", json!({ "scope": "orders" }))).await?;
let orders = orders_val.as_array().cloned().unwrap_or_default();
let mut swept = 0usize;
for order in &orders {
let complete = order["complete"].as_bool().unwrap_or(false);
let ship_date_str = order["shipDate"].as_str().unwrap_or("2099-01-01T00:00:00Z");
if let Ok(ship_date) = chrono::DateTime::parse_from_rfc3339(ship_date_str) {
if !complete && chrono::Utc::now() > ship_date {
logger.warn("Order overdue", Some(json!({ "orderId": order["id"] })));
iii.trigger(TriggerRequest::new("enqueue", json!({
"topic": "notification",
"data": {
"orderId": order["id"],
"templateId": "order-audit-warning",
},
})).action(TriggerAction::void())).await?;
swept += 1;
}
}
}
logger.info("Sweep complete", Some(json!({
"checked": orders.len(),
"swept": swept,
})));
Ok(json!(null))
});
iii.register_trigger(RegisterTriggerInput { type_: "cron".into(), function_id: "cron.orders_sweep".into(), config: json!({ "expression": "*/5 * * * *" }), ..Default::default() })?;Cron expression format
iii uses a six-field extended cron format. The optional leading field is seconds.
┌──────────── second (0-59, optional)
│ ┌────────── minute (0-59)
│ │ ┌──────── hour (0-23)
│ │ │ ┌────── day of month (1-31)
│ │ │ │ ┌──── month (1-12)
│ │ │ │ │ ┌── day of week (0-7)
│ │ │ │ │ │
* * * * * *| Expression | Meaning |
|---|---|
* * * * * | Every minute |
0 * * * * | Every hour |
0 9 * * 1-5 | 09:00 on weekdays |
*/5 * * * * | Every 5 minutes |
0/5 * * * * * | Every 5 seconds (with seconds field) |
Key concepts
- Cron handlers receive
{trigger, job_id, scheduled_time, actual_time}as their payload. - Keep cron handlers lightweight. Use
enqueueto fan out work to queue handlers that can run in parallel. - Combine
state::listwith cron for periodic sweeps over accumulated data — the state sweep pattern above is the idiomatic approach.