iii

Use Queues

How to enqueue work and process it asynchronously with retries, concurrency control, and ordering guarantees using Named Queues and Trigger Actions.

Goal

Offload work to a named queue so it can be processed asynchronously by any function — with built-in retries, concurrency control, and optional FIFO ordering. The new Named Queues API lets you define queues in config and enqueue work via trigger() with an action parameter, so target functions receive data normally with no handler changes.

Steps

1. Define named queues in config

Declare one or more named queues under queue_configs in your iii-config.yaml. Each queue has its own retry, concurrency, and ordering settings.

iii-config.yaml
modules:
  - class: modules::queue::QueueModule
    config:
      queue_configs:
        default:
          max_retries: 5
          concurrency: 5
          type: standard
        payment:
          max_retries: 10
          concurrency: 2
          type: fifo
          message_group_field: transaction_id
      adapter:
        class: modules::queue::BuiltinQueueAdapter
        config:
          store_method: file_based        # Options: in_memory, file_based
          file_path: ./data/queue_store   # Required for file_based

The default queue is used when no queue name is specified. You can add as many named queues as you need.

Queue config fields

FieldTypeDefaultDescription
max_retriesu323Max delivery attempts before moving to DLQ
concurrencyu3210Max concurrent workers processing this queue
typestring"standard""standard" processes jobs concurrently; "fifo" processes them in order
message_group_fieldstringnullRequired for FIFO — the JSON field in job data used for ordering (value must be non-null)
backoff_msu641000Base backoff delay between retries (ms)
poll_interval_msu64100Worker poll interval (ms)

2. Enqueue work via trigger action

From any function, enqueue a job by calling trigger() with the Enqueue action and the target queue name. The enqueue operation is asynchronous — the caller does not wait for the job to be processed — but the SDK does await an acknowledgement from the engine confirming the job was accepted. The returned result contains { messageReceiptId: string } on success, or an enqueue_error if the queue was not found or FIFO validation failed.

enqueue-order.ts
import { registerWorker, Logger, TriggerAction } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction({ id: 'orders::create' }, async (input) => {
  const logger = new Logger()
  const order = { id: crypto.randomUUID(), ...input }

  // Enqueue to the 'payment' queue — process-order receives `order` as its input
  await iii.trigger({
    function_id: 'orders::process-order',
    payload: order,
    action: TriggerAction.Enqueue({ queue: 'payment' }),
  })

  logger.info('Order enqueued', { orderId: order.id })
  return { orderId: order.id }
})
enqueue_order.py
import os
import uuid

from iii import Logger, TriggerAction, register_worker

iii = register_worker(os.environ.get("III_URL", "ws://localhost:49134"))


def create_order(input):
    logger = Logger()
    order = {"id": str(uuid.uuid4()), **input}

    # Enqueue to the 'payment' queue
    iii.trigger({
        "function_id": "orders::process-order",
        "payload": order,
        "action": TriggerAction.Enqueue(queue="payment"),
    })

    logger.info("Order enqueued", {"orderId": order["id"]})
    return {"orderId": order["id"]}


iii.register_function({"id": "orders::create"}, create_order)
enqueue_order.rs
use iii_sdk::{
    register_worker, InitOptions, Logger, RegisterFunctionMessage,
    TriggerAction, TriggerRequest,
};
use serde_json::json;

let iii = register_worker(
    &std::env::var("III_URL").unwrap_or_else(|_| "ws://127.0.0.1:49134".to_string()),
    InitOptions::default(),
);

let iii_clone = iii.clone();
iii.register_function(
    RegisterFunctionMessage {
        id: "orders::create".to_string(),
        description: None,
        request_format: None,
        response_format: None,
        metadata: None,
        invocation: None,
    },
    move |input| {
        let iii = iii_clone.clone();
        async move {
            let logger = Logger::new();
            let order_id = uuid::Uuid::new_v4().to_string();
            let order = json!({
                "id": order_id,
                "items": input["items"],
                "total": input["total"],
            });

            iii.trigger(TriggerRequest {
                function_id: "orders::process-order".to_string(),
                payload: order,
                action: Some(TriggerAction::Enqueue { queue: "payment".to_string() }),
                timeout_ms: None,
            })
            .await?;

            logger.info("Order enqueued", Some(json!({ "orderId": order_id })));
            Ok(json!({ "orderId": order_id }))
        }
    },
);

No handler changes needed

The target function (orders::process-order) receives the enqueued data as its normal input argument. You do not need to register a queue trigger or unwrap a special envelope — the queue is transparent to the consumer.

3. Write the target function

The function that processes queued work is a regular function. It receives the data you passed to trigger() as its input. When it returns successfully the job is acknowledged and removed. If it throws, the job is retried according to the queue's max_retries and backoff_ms settings.

process-order.ts
import { registerWorker, Logger } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction({ id: 'orders::process-order' }, async (order) => {
  const logger = new Logger()
  logger.info('Processing payment', { orderId: order.id })
  // ...payment logic...
  return { processed: true }
})
process_order.py
import os

from iii import Logger, register_worker

iii = register_worker(os.environ.get("III_URL", "ws://localhost:49134"))


def process_order(order):
    logger = Logger()
    logger.info("Processing payment", {"orderId": order["id"]})
    # ...payment logic...
    return {"processed": True}


iii.register_function({"id": "orders::process-order"}, process_order)
process_order.rs
use iii_sdk::{register_worker, InitOptions, Logger, RegisterFunctionMessage};
use serde_json::json;

let iii = register_worker(
    &std::env::var("III_URL").unwrap_or_else(|_| "ws://127.0.0.1:49134".to_string()),
    InitOptions::default(),
);

iii.register_function(
    RegisterFunctionMessage {
        id: "orders::process-order".to_string(),
        description: None,
        request_format: None,
        response_format: None,
        metadata: None,
        invocation: None,
    },
    |order| async move {
        let logger = Logger::new();
        let order_id = order["id"].as_str().unwrap_or("");
        logger.info("Processing payment", Some(json!({ "orderId": order_id })));
        // ...payment logic...
        Ok(json!({ "processed": true }))
    },
);

A worker can also enqueue further work, creating processing pipelines:

iii.registerFunction({ id: 'orders::process-order' }, async (order) => {
  // ...charge the customer...

  await iii.trigger({
    function_id: 'notifications::send',
    payload: { orderId: order.id, type: 'payment-confirmed' },
    action: TriggerAction.Enqueue({ queue: 'default' }),
  })

  return { processed: true }
})
def process_order(order):
    # ...charge the customer...

    iii.trigger({
        "function_id": "notifications::send",
        "payload": {"orderId": order["id"], "type": "payment-confirmed"},
        "action": TriggerAction.Enqueue(queue="default"),
    })

    return {"processed": True}
use iii_sdk::{RegisterFunctionMessage, TriggerAction, TriggerRequest};
use serde_json::json;

iii.register_function(
    RegisterFunctionMessage {
        id: "orders::process-order".to_string(),
        description: None,
        request_format: None,
        response_format: None,
        metadata: None,
        invocation: None,
    },
    |order| async move {
        iii.trigger(TriggerRequest {
            function_id: "notifications::send".to_string(),
            payload: json!({
                "orderId": order["id"],
                "type": "payment-confirmed",
            }),
            action: Some(TriggerAction::Enqueue { queue: "default".to_string() }),
            timeout_ms: None,
        })
        .await?;

        Ok(json!({ "processed": true }))
    },
);

4. Use FIFO queues for ordered processing

When order matters (e.g. payment transactions for the same account), use a FIFO queue. Set type: fifo and specify message_group_field — the field in your job data whose value determines the ordering group. Jobs with the same group value are processed strictly in order. The field named by message_group_field must be present and non-null in every job payload — the engine rejects enqueue requests where the field is missing or null.

iii-config.yaml (excerpt)
queue_configs:
  payment:
    max_retries: 10
    concurrency: 2
    type: fifo
    message_group_field: transaction_id  # job payloads must include a non-null transaction_id

The job data must contain the field specified by message_group_field:

// The 'transaction_id' field in the data is used for FIFO ordering
await iii.trigger({
  function_id: 'payments::process',
  payload: { transaction_id: 'txn-abc-123', amount: 49.99, currency: 'USD' },
  action: TriggerAction.Enqueue({ queue: 'payment' }),
})
# The 'transaction_id' field in the data is used for FIFO ordering
iii.trigger({
    "function_id": "payments::process",
    "payload": {
        "transaction_id": "txn-abc-123",
        "amount": 49.99,
        "currency": "USD",
    },
    "action": TriggerAction.Enqueue(queue="payment"),
})
use iii_sdk::{TriggerAction, TriggerRequest};
use serde_json::json;

iii.trigger(TriggerRequest {
    function_id: "payments::process".to_string(),
    payload: json!({
        "transaction_id": "txn-abc-123",
        "amount": 49.99,
        "currency": "USD",
    }),
    action: Some(TriggerAction::Enqueue { queue: "payment".to_string() }),
    timeout_ms: None,
})
.await?;

5. Use Void for fire-and-forget calls

When you want to trigger a function without waiting for a response and without going through a queue, use the Void action. This replaces the old triggerVoid method.

import { TriggerAction } from 'iii-sdk'

// Fire-and-forget — no queue, no response
await iii.trigger({
  function_id: 'notifications::send-email',
  payload: { to: 'user@example.com', subject: 'Hello' },
  action: TriggerAction.Void(),
})
from iii import TriggerAction

# Fire-and-forget — no queue, no response
iii.trigger({
    "function_id": "notifications::send-email",
    "payload": {"to": "user@example.com", "subject": "Hello"},
    "action": TriggerAction.Void(),
})
use iii_sdk::{TriggerAction, TriggerRequest};
use serde_json::json;

iii.trigger(TriggerRequest {
    function_id: "notifications::send-email".to_string(),
    payload: json!({
        "to": "user@example.com",
        "subject": "Hello",
    }),
    action: Some(TriggerAction::Void),
    timeout_ms: None,
})
.await?;

6. Trigger the workflow

With the queue config and functions in place, kick off the workflow by calling the function that creates work:

const { orderId } = await iii.trigger({
  function_id: 'orders::create',
  payload: { items: ['item-1', 'item-2'], total: 99.99 },
})
logger.info('Order created', { orderId })
result = iii.trigger({
    "function_id": "orders::create",
    "payload": {"items": ["item-1", "item-2"], "total": 99.99},
})
print("Order created", result["orderId"])
use iii_sdk::TriggerRequest;
use serde_json::json;

let result = iii
    .trigger(TriggerRequest {
        function_id: "orders::create".to_string(),
        payload: json!({
            "items": ["item-1", "item-2"],
            "total": 99.99,
        }),
        action: None,
        timeout_ms: None,
    })
    .await?;
println!("Order created: {}", result["orderId"]);

API Reference

Trigger actions

ActionBehaviorSDK Usage
(none)Direct call — waits for responsetrigger({ function_id: 'fn', payload: data })
EnqueueAsync via named queue — returns { messageReceiptId: string } or enqueue_errortrigger({ function_id: 'fn', payload: data, action: TriggerAction.Enqueue({ queue: 'name' }) })
VoidFire-and-forget, no queuetrigger({ function_id: 'fn', payload: data, action: TriggerAction.Void() })

SDK quick reference

import { registerWorker, TriggerAction } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

// Direct call (request/response)
const result = await iii.trigger({
  function_id: 'get-order',
  payload: { id: '123' },
})

// Enqueue to named queue (async, returns acknowledgement)
await iii.trigger({
  function_id: 'process-order',
  payload: orderData,
  action: TriggerAction.Enqueue({ queue: 'payment' }),
})

// Void (fire-and-forget, no queue)
await iii.trigger({
  function_id: 'send-notification',
  payload: data,
  action: TriggerAction.Void(),
})
import os

from iii import TriggerAction, register_worker

iii = register_worker(os.environ.get("III_URL", "ws://localhost:49134"))

# Direct call (request/response)
result = iii.trigger({
    "function_id": "get-order",
    "payload": {"id": "123"},
})

# Enqueue to named queue (async, returns acknowledgement)
iii.trigger({
    "function_id": "process-order",
    "payload": order_data,
    "action": TriggerAction.Enqueue(queue="payment"),
})

# Void (fire-and-forget, no queue)
iii.trigger({
    "function_id": "send-notification",
    "payload": data,
    "action": TriggerAction.Void(),
})
use iii_sdk::{register_worker, InitOptions, TriggerAction, TriggerRequest};
use serde_json::json;

let iii = register_worker(
    &std::env::var("III_URL").unwrap_or_else(|_| "ws://127.0.0.1:49134".to_string()),
    InitOptions::default(),
);

// Direct call (request/response)
let result = iii
    .trigger(TriggerRequest {
        function_id: "get-order".to_string(),
        payload: json!({ "id": "123" }),
        action: None,
        timeout_ms: None,
    })
    .await?;

// Enqueue to named queue (async, returns acknowledgement)
iii.trigger(TriggerRequest {
    function_id: "process-order".to_string(),
    payload: order_data,
    action: Some(TriggerAction::Enqueue { queue: "payment".to_string() }),
    timeout_ms: None,
})
.await?;

// Void (fire-and-forget, no queue)
iii.trigger(TriggerRequest {
    function_id: "send-notification".to_string(),
    payload: data,
    action: Some(TriggerAction::Void),
    timeout_ms: None,
})
.await?;

Migrating from the old topic-based API

Topic-based queues

The topic-based API (triggerVoid('enqueue', { topic, data }) and registerTrigger({ type: 'queue', ... })) remains valid. Named Queues + Trigger Actions offer config-driven retries, concurrency, and FIFO ordering.

Enqueue: before and after

Before (deprecated)
// Old: enqueue via triggerVoid with topic
iii.triggerVoid('enqueue', {
  topic: 'order.created',
  data: order,
})

// Old: register a queue trigger to consume
iii.registerTrigger({
  type: 'queue',
  function_id: 'orders::process-payment',
  config: { topic: 'order.created' },
})
After (Named Queues)
// New: enqueue directly to the target function via a named queue
await iii.trigger({
  function_id: 'orders::process-payment',
  payload: order,
  action: TriggerAction.Enqueue({ queue: 'payment' }),
})

// No trigger registration needed — the function receives data normally
Before (deprecated)
# Old: enqueue via trigger with topic
iii.trigger({
    "function_id": "enqueue",
    "payload": {"topic": "order.created", "data": order},
})

# Old: register a queue trigger to consume
iii.register_trigger({
    "type": "queue",
    "function_id": "orders::process-payment",
    "config": {"topic": "order.created"},
})
After (Named Queues)
# New: enqueue directly to the target function via a named queue
iii.trigger({
    "function_id": "orders::process-payment",
    "payload": order,
    "action": TriggerAction.Enqueue(queue="payment"),
})

# No trigger registration needed
Before (deprecated)
use iii_sdk::{TriggerAction, TriggerRequest};
use serde_json::json;

// Old: enqueue via trigger with topic
iii.trigger(TriggerRequest {
    function_id: "enqueue".to_string(),
    payload: json!({"topic": "order.created", "data": order}),
    action: Some(TriggerAction::Void),
    timeout_ms: None,
}).await?;

// Old: register a queue trigger to consume
iii.register_trigger("queue", "orders::process-payment", json!({
    "topic": "order.created",
}))?;
After (Named Queues)
use iii_sdk::{TriggerAction, TriggerRequest};

iii.trigger(TriggerRequest {
    function_id: "orders::process-payment".to_string(),
    payload: order,
    action: Some(TriggerAction::Enqueue { queue: "payment".to_string() }),
    timeout_ms: None,
})
.await?;

triggerVoid: before and after

Before (deprecated)
iii.triggerVoid('send-notification', data)
After
await iii.trigger({
  function_id: 'send-notification',
  payload: data,
  action: TriggerAction.Void(),
})
Before (deprecated)
iii.trigger({
    "function_id": "send-notification",
    "payload": data,
})
After
iii.trigger({
    "function_id": "send-notification",
    "payload": data,
    "action": TriggerAction.Void(),
})
Before (deprecated)
use iii_sdk::{TriggerAction, TriggerRequest};

iii.trigger(TriggerRequest {
    function_id: "send-notification".to_string(),
    payload: data,
    action: Some(TriggerAction::Void),
    timeout_ms: None,
}).await?;
After
use iii_sdk::{TriggerAction, TriggerRequest};

iii.trigger(TriggerRequest {
    function_id: "send-notification".to_string(),
    payload: data,
    action: Some(TriggerAction::Void),
    timeout_ms: None,
})
.await?;

Result

Jobs are enqueued instantly and processed asynchronously by the target function. Failed jobs retry with backoff based on the queue's backoff_ms setting. After all retries exhaust, jobs move to a dead letter queue where they are preserved for inspection or reprocessing.

Dead letter queues

See Manage Failed Triggers for DLQ configuration and redrive.

On this page