Use Queues
How to enqueue work and process it asynchronously with retries, concurrency control, and ordering guarantees using Named Queues and Trigger Actions.
Goal
Offload work to a named queue so it can be processed asynchronously by any function — with built-in retries, concurrency control, and optional FIFO ordering. The new Named Queues API lets you define queues in config and enqueue work via trigger() with an action parameter, so target functions receive data normally with no handler changes.
Steps
1. Define named queues in config
Declare one or more named queues under queue_configs in your iii-config.yaml. Each queue has its own retry, concurrency, and ordering settings.
modules:
- class: modules::queue::QueueModule
config:
queue_configs:
default:
max_retries: 5
concurrency: 5
type: standard
payment:
max_retries: 10
concurrency: 2
type: fifo
message_group_field: transaction_id
adapter:
class: modules::queue::BuiltinQueueAdapter
config:
store_method: file_based # Options: in_memory, file_based
file_path: ./data/queue_store # Required for file_basedThe default queue is used when no queue name is specified. You can add as many named queues as you need.
Queue config fields
| Field | Type | Default | Description |
|---|---|---|---|
max_retries | u32 | 3 | Max delivery attempts before moving to DLQ |
concurrency | u32 | 10 | Max concurrent workers processing this queue |
type | string | "standard" | "standard" processes jobs concurrently; "fifo" processes them in order |
message_group_field | string | null | Required for FIFO — the JSON field in job data used for ordering (value must be non-null) |
backoff_ms | u64 | 1000 | Base backoff delay between retries (ms) |
poll_interval_ms | u64 | 100 | Worker poll interval (ms) |
2. Enqueue work via trigger action
From any function, enqueue a job by calling trigger() with the Enqueue action and the target queue name. The enqueue operation is asynchronous — the caller does not wait for the job to be processed — but the SDK does await an acknowledgement from the engine confirming the job was accepted. The returned result contains { messageReceiptId: string } on success, or an enqueue_error if the queue was not found or FIFO validation failed.
import { registerWorker, Logger, TriggerAction } from 'iii-sdk'
const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')
iii.registerFunction({ id: 'orders::create' }, async (input) => {
const logger = new Logger()
const order = { id: crypto.randomUUID(), ...input }
// Enqueue to the 'payment' queue — process-order receives `order` as its input
await iii.trigger({
function_id: 'orders::process-order',
payload: order,
action: TriggerAction.Enqueue({ queue: 'payment' }),
})
logger.info('Order enqueued', { orderId: order.id })
return { orderId: order.id }
})import os
import uuid
from iii import Logger, TriggerAction, register_worker
iii = register_worker(os.environ.get("III_URL", "ws://localhost:49134"))
def create_order(input):
logger = Logger()
order = {"id": str(uuid.uuid4()), **input}
# Enqueue to the 'payment' queue
iii.trigger({
"function_id": "orders::process-order",
"payload": order,
"action": TriggerAction.Enqueue(queue="payment"),
})
logger.info("Order enqueued", {"orderId": order["id"]})
return {"orderId": order["id"]}
iii.register_function({"id": "orders::create"}, create_order)use iii_sdk::{
register_worker, InitOptions, Logger, RegisterFunctionMessage,
TriggerAction, TriggerRequest,
};
use serde_json::json;
let iii = register_worker(
&std::env::var("III_URL").unwrap_or_else(|_| "ws://127.0.0.1:49134".to_string()),
InitOptions::default(),
);
let iii_clone = iii.clone();
iii.register_function(
RegisterFunctionMessage {
id: "orders::create".to_string(),
description: None,
request_format: None,
response_format: None,
metadata: None,
invocation: None,
},
move |input| {
let iii = iii_clone.clone();
async move {
let logger = Logger::new();
let order_id = uuid::Uuid::new_v4().to_string();
let order = json!({
"id": order_id,
"items": input["items"],
"total": input["total"],
});
iii.trigger(TriggerRequest {
function_id: "orders::process-order".to_string(),
payload: order,
action: Some(TriggerAction::Enqueue { queue: "payment".to_string() }),
timeout_ms: None,
})
.await?;
logger.info("Order enqueued", Some(json!({ "orderId": order_id })));
Ok(json!({ "orderId": order_id }))
}
},
);No handler changes needed
The target function (orders::process-order) receives the enqueued data as its normal input argument.
You do not need to register a queue trigger or unwrap a special envelope — the queue is transparent to the consumer.
3. Write the target function
The function that processes queued work is a regular function. It receives the data you passed to trigger() as its input. When it returns successfully the job is acknowledged and removed. If it throws, the job is retried according to the queue's max_retries and backoff_ms settings.
import { registerWorker, Logger } from 'iii-sdk'
const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')
iii.registerFunction({ id: 'orders::process-order' }, async (order) => {
const logger = new Logger()
logger.info('Processing payment', { orderId: order.id })
// ...payment logic...
return { processed: true }
})import os
from iii import Logger, register_worker
iii = register_worker(os.environ.get("III_URL", "ws://localhost:49134"))
def process_order(order):
logger = Logger()
logger.info("Processing payment", {"orderId": order["id"]})
# ...payment logic...
return {"processed": True}
iii.register_function({"id": "orders::process-order"}, process_order)use iii_sdk::{register_worker, InitOptions, Logger, RegisterFunctionMessage};
use serde_json::json;
let iii = register_worker(
&std::env::var("III_URL").unwrap_or_else(|_| "ws://127.0.0.1:49134".to_string()),
InitOptions::default(),
);
iii.register_function(
RegisterFunctionMessage {
id: "orders::process-order".to_string(),
description: None,
request_format: None,
response_format: None,
metadata: None,
invocation: None,
},
|order| async move {
let logger = Logger::new();
let order_id = order["id"].as_str().unwrap_or("");
logger.info("Processing payment", Some(json!({ "orderId": order_id })));
// ...payment logic...
Ok(json!({ "processed": true }))
},
);A worker can also enqueue further work, creating processing pipelines:
iii.registerFunction({ id: 'orders::process-order' }, async (order) => {
// ...charge the customer...
await iii.trigger({
function_id: 'notifications::send',
payload: { orderId: order.id, type: 'payment-confirmed' },
action: TriggerAction.Enqueue({ queue: 'default' }),
})
return { processed: true }
})def process_order(order):
# ...charge the customer...
iii.trigger({
"function_id": "notifications::send",
"payload": {"orderId": order["id"], "type": "payment-confirmed"},
"action": TriggerAction.Enqueue(queue="default"),
})
return {"processed": True}use iii_sdk::{RegisterFunctionMessage, TriggerAction, TriggerRequest};
use serde_json::json;
iii.register_function(
RegisterFunctionMessage {
id: "orders::process-order".to_string(),
description: None,
request_format: None,
response_format: None,
metadata: None,
invocation: None,
},
|order| async move {
iii.trigger(TriggerRequest {
function_id: "notifications::send".to_string(),
payload: json!({
"orderId": order["id"],
"type": "payment-confirmed",
}),
action: Some(TriggerAction::Enqueue { queue: "default".to_string() }),
timeout_ms: None,
})
.await?;
Ok(json!({ "processed": true }))
},
);4. Use FIFO queues for ordered processing
When order matters (e.g. payment transactions for the same account), use a FIFO queue. Set type: fifo and specify message_group_field — the field in your job data whose value determines the ordering group. Jobs with the same group value are processed strictly in order. The field named by message_group_field must be present and non-null in every job payload — the engine rejects enqueue requests where the field is missing or null.
queue_configs:
payment:
max_retries: 10
concurrency: 2
type: fifo
message_group_field: transaction_id # job payloads must include a non-null transaction_idThe job data must contain the field specified by message_group_field:
// The 'transaction_id' field in the data is used for FIFO ordering
await iii.trigger({
function_id: 'payments::process',
payload: { transaction_id: 'txn-abc-123', amount: 49.99, currency: 'USD' },
action: TriggerAction.Enqueue({ queue: 'payment' }),
})# The 'transaction_id' field in the data is used for FIFO ordering
iii.trigger({
"function_id": "payments::process",
"payload": {
"transaction_id": "txn-abc-123",
"amount": 49.99,
"currency": "USD",
},
"action": TriggerAction.Enqueue(queue="payment"),
})use iii_sdk::{TriggerAction, TriggerRequest};
use serde_json::json;
iii.trigger(TriggerRequest {
function_id: "payments::process".to_string(),
payload: json!({
"transaction_id": "txn-abc-123",
"amount": 49.99,
"currency": "USD",
}),
action: Some(TriggerAction::Enqueue { queue: "payment".to_string() }),
timeout_ms: None,
})
.await?;5. Use Void for fire-and-forget calls
When you want to trigger a function without waiting for a response and without going through a queue, use the Void action. This replaces the old triggerVoid method.
import { TriggerAction } from 'iii-sdk'
// Fire-and-forget — no queue, no response
await iii.trigger({
function_id: 'notifications::send-email',
payload: { to: 'user@example.com', subject: 'Hello' },
action: TriggerAction.Void(),
})from iii import TriggerAction
# Fire-and-forget — no queue, no response
iii.trigger({
"function_id": "notifications::send-email",
"payload": {"to": "user@example.com", "subject": "Hello"},
"action": TriggerAction.Void(),
})use iii_sdk::{TriggerAction, TriggerRequest};
use serde_json::json;
iii.trigger(TriggerRequest {
function_id: "notifications::send-email".to_string(),
payload: json!({
"to": "user@example.com",
"subject": "Hello",
}),
action: Some(TriggerAction::Void),
timeout_ms: None,
})
.await?;6. Trigger the workflow
With the queue config and functions in place, kick off the workflow by calling the function that creates work:
const { orderId } = await iii.trigger({
function_id: 'orders::create',
payload: { items: ['item-1', 'item-2'], total: 99.99 },
})
logger.info('Order created', { orderId })result = iii.trigger({
"function_id": "orders::create",
"payload": {"items": ["item-1", "item-2"], "total": 99.99},
})
print("Order created", result["orderId"])use iii_sdk::TriggerRequest;
use serde_json::json;
let result = iii
.trigger(TriggerRequest {
function_id: "orders::create".to_string(),
payload: json!({
"items": ["item-1", "item-2"],
"total": 99.99,
}),
action: None,
timeout_ms: None,
})
.await?;
println!("Order created: {}", result["orderId"]);API Reference
Trigger actions
| Action | Behavior | SDK Usage |
|---|---|---|
| (none) | Direct call — waits for response | trigger({ function_id: 'fn', payload: data }) |
Enqueue | Async via named queue — returns { messageReceiptId: string } or enqueue_error | trigger({ function_id: 'fn', payload: data, action: TriggerAction.Enqueue({ queue: 'name' }) }) |
Void | Fire-and-forget, no queue | trigger({ function_id: 'fn', payload: data, action: TriggerAction.Void() }) |
SDK quick reference
import { registerWorker, TriggerAction } from 'iii-sdk'
const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')
// Direct call (request/response)
const result = await iii.trigger({
function_id: 'get-order',
payload: { id: '123' },
})
// Enqueue to named queue (async, returns acknowledgement)
await iii.trigger({
function_id: 'process-order',
payload: orderData,
action: TriggerAction.Enqueue({ queue: 'payment' }),
})
// Void (fire-and-forget, no queue)
await iii.trigger({
function_id: 'send-notification',
payload: data,
action: TriggerAction.Void(),
})import os
from iii import TriggerAction, register_worker
iii = register_worker(os.environ.get("III_URL", "ws://localhost:49134"))
# Direct call (request/response)
result = iii.trigger({
"function_id": "get-order",
"payload": {"id": "123"},
})
# Enqueue to named queue (async, returns acknowledgement)
iii.trigger({
"function_id": "process-order",
"payload": order_data,
"action": TriggerAction.Enqueue(queue="payment"),
})
# Void (fire-and-forget, no queue)
iii.trigger({
"function_id": "send-notification",
"payload": data,
"action": TriggerAction.Void(),
})use iii_sdk::{register_worker, InitOptions, TriggerAction, TriggerRequest};
use serde_json::json;
let iii = register_worker(
&std::env::var("III_URL").unwrap_or_else(|_| "ws://127.0.0.1:49134".to_string()),
InitOptions::default(),
);
// Direct call (request/response)
let result = iii
.trigger(TriggerRequest {
function_id: "get-order".to_string(),
payload: json!({ "id": "123" }),
action: None,
timeout_ms: None,
})
.await?;
// Enqueue to named queue (async, returns acknowledgement)
iii.trigger(TriggerRequest {
function_id: "process-order".to_string(),
payload: order_data,
action: Some(TriggerAction::Enqueue { queue: "payment".to_string() }),
timeout_ms: None,
})
.await?;
// Void (fire-and-forget, no queue)
iii.trigger(TriggerRequest {
function_id: "send-notification".to_string(),
payload: data,
action: Some(TriggerAction::Void),
timeout_ms: None,
})
.await?;Migrating from the old topic-based API
Topic-based queues
The topic-based API (triggerVoid('enqueue', { topic, data }) and registerTrigger({ type: 'queue', ... })) remains valid. Named Queues + Trigger Actions offer config-driven retries, concurrency, and FIFO ordering.
Enqueue: before and after
// Old: enqueue via triggerVoid with topic
iii.triggerVoid('enqueue', {
topic: 'order.created',
data: order,
})
// Old: register a queue trigger to consume
iii.registerTrigger({
type: 'queue',
function_id: 'orders::process-payment',
config: { topic: 'order.created' },
})// New: enqueue directly to the target function via a named queue
await iii.trigger({
function_id: 'orders::process-payment',
payload: order,
action: TriggerAction.Enqueue({ queue: 'payment' }),
})
// No trigger registration needed — the function receives data normally# Old: enqueue via trigger with topic
iii.trigger({
"function_id": "enqueue",
"payload": {"topic": "order.created", "data": order},
})
# Old: register a queue trigger to consume
iii.register_trigger({
"type": "queue",
"function_id": "orders::process-payment",
"config": {"topic": "order.created"},
})# New: enqueue directly to the target function via a named queue
iii.trigger({
"function_id": "orders::process-payment",
"payload": order,
"action": TriggerAction.Enqueue(queue="payment"),
})
# No trigger registration neededuse iii_sdk::{TriggerAction, TriggerRequest};
use serde_json::json;
// Old: enqueue via trigger with topic
iii.trigger(TriggerRequest {
function_id: "enqueue".to_string(),
payload: json!({"topic": "order.created", "data": order}),
action: Some(TriggerAction::Void),
timeout_ms: None,
}).await?;
// Old: register a queue trigger to consume
iii.register_trigger("queue", "orders::process-payment", json!({
"topic": "order.created",
}))?;use iii_sdk::{TriggerAction, TriggerRequest};
iii.trigger(TriggerRequest {
function_id: "orders::process-payment".to_string(),
payload: order,
action: Some(TriggerAction::Enqueue { queue: "payment".to_string() }),
timeout_ms: None,
})
.await?;triggerVoid: before and after
iii.triggerVoid('send-notification', data)await iii.trigger({
function_id: 'send-notification',
payload: data,
action: TriggerAction.Void(),
})iii.trigger({
"function_id": "send-notification",
"payload": data,
})iii.trigger({
"function_id": "send-notification",
"payload": data,
"action": TriggerAction.Void(),
})use iii_sdk::{TriggerAction, TriggerRequest};
iii.trigger(TriggerRequest {
function_id: "send-notification".to_string(),
payload: data,
action: Some(TriggerAction::Void),
timeout_ms: None,
}).await?;use iii_sdk::{TriggerAction, TriggerRequest};
iii.trigger(TriggerRequest {
function_id: "send-notification".to_string(),
payload: data,
action: Some(TriggerAction::Void),
timeout_ms: None,
})
.await?;Result
Jobs are enqueued instantly and processed asynchronously by the target function. Failed jobs retry with backoff based on the queue's backoff_ms setting. After all retries exhaust, jobs move to a dead letter queue where they are preserved for inspection or reprocessing.
Dead letter queues
See Manage Failed Triggers for DLQ configuration and redrive.