Queue
Queue module for async job processing with named queues, retries, and dead-letter support.
A module for asynchronous job processing. It supports two modes: topic-based queues (register a consumer per topic, emit events) and named queues (enqueue function calls via TriggerAction.Enqueue, no trigger registration).
modules::queue::QueueModuleHow-to guidance
For step-by-step instructions, see Use Queues and Manage Failed Triggers.
Queue modes
Topic-based queues
Register a consumer for a topic and emit events to it.
- Register a consumer with
registerTrigger({ type: 'queue', function_id: 'my::handler', config: { topic: 'order.created' } }). This subscribes the handler to that topic. - Emit events by calling
trigger({ function_id: 'enqueue', payload: { topic: 'order.created', data: payload } })ortrigger({ function_id: 'enqueue', payload: { topic, data }, action: TriggerAction.Void() })for fire-and-forget. Theenqueuefunction routes the payload to all subscribers of that topic. - Action on the trigger: the handler receives the
dataas its input. Optionalqueue_configon the trigger controls per-subscriber retries and concurrency.
The producer knows the topic name; consumers register to receive it. Queue settings can live at the trigger registration site.
Named queues
Define queues in iii-config.yaml, then enqueue function calls directly. No trigger registration.
- Define queues in
queue_configs(see Configuration). - Enqueue a function call with
trigger({ function_id: 'orders::process', payload, action: TriggerAction.Enqueue({ queue: 'payment' }) }). The engine routes the job to the named queue and invokes the function when a worker consumes it. - Action on the trigger: the target function receives
payloadas its input. Retries, concurrency, and FIFO are configured iniii-config.yaml.
The producer targets the function and queue explicitly. Queue configuration is centralized.
When to use which
| Topic-based | Named queues | |
|---|---|---|
| Producer | Calls trigger({ function_id: 'enqueue', payload: { topic, data } }) | Calls trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) }) |
| Consumer | Registers registerTrigger({ type: 'queue', config: { topic } }) | No registration — function is the target |
| Config | Optional queue_config on trigger | queue_configs in iii-config.yaml |
| Use case | Pub/sub, multiple subscribers per topic | Direct function invocation with retries, FIFO, DLQ |
Both modes are valid. Named queues offer config-driven retries, concurrency, and FIFO. See Queue System for design rationale.
Sample Configuration
- class: modules::queue::QueueModule
config:
queue_configs:
default:
max_retries: 5
concurrency: 5
type: standard
payment:
max_retries: 10
concurrency: 2
type: fifo
message_group_field: transaction_id
adapter:
class: modules::queue::BuiltinQueueAdapter
config:
store_method: file_based
file_path: ./data/queue_storeConfiguration
A map of named queue configurations. Each key is the queue name referenced in TriggerAction.Enqueue({ queue: 'name' }). Define a queue named default in config for the common case; reference it as TriggerAction.Enqueue({ queue: 'default' }).
The transport adapter for queue persistence and distribution. Defaults to modules::queue::BuiltinQueueAdapter when not specified.
Queue Configuration
Each entry in queue_configs defines an independent named queue with its own retry, concurrency, and ordering settings.
Maximum delivery attempts before routing the job to the dead-letter queue. Defaults to 3.
Required when type is fifo. The JSON field in the job payload whose value determines the ordering group. Jobs with the same group value are processed strictly in order. The field must be present and non-null in every enqueued payload.
Base retry backoff in milliseconds. Applied with exponential scaling: backoff_ms × 2^(attempt − 1). Defaults to 1000.
Adapters
modules::queue::BuiltinQueueAdapter
Built-in in-process queue. No external dependencies. Suitable for single-instance deployments — messages are not shared across engine instances.
class: modules::queue::BuiltinQueueAdapter
config:
store_method: file_based # in_memory | file_based
file_path: ./data/queue_store # required when store_method is file_basedPersistence strategy: in_memory (lost on restart) or file_based (durable across restarts). Defaults to in_memory.
modules::queue::RedisAdapter
Uses Redis as the queue backend. Enables message distribution across multiple engine instances. Does not support retries or dead-letter queues.
class: modules::queue::RedisAdapter
config:
redis_url: ${REDIS_URL:redis://localhost:6379}modules::queue::RabbitMQAdapter
Uses RabbitMQ as the queue backend. Supports durable delivery, retries, and dead-letter queues across multiple engine instances.
The engine owns consumer loops, retry acknowledgement, and backoff logic — RabbitMQ is used as a transport only. Retry uses explicit ack + republish to a retry exchange with an x-attempt header, keeping compatibility with both classic and quorum queues.
class: modules::queue::RabbitMQAdapter
config:
amqp_url: ${RABBITMQ_URL:amqp://localhost:5672}Adapter Comparison
| BuiltinQueueAdapter | RabbitMQAdapter | RedisAdapter | |
|---|---|---|---|
| Retries | Yes | Yes | No |
| Dead-letter queue | Yes | Yes | No |
| FIFO ordering | Yes | Yes | No |
| Named queues | Yes | Yes | No |
| Multi-instance | No | Yes | Yes |