PubSub
Topic-based publish/subscribe messaging for real-time event distribution.
Topic-based publish/subscribe messaging for broadcasting events to multiple subscribers in real time.
modules::pubsub::PubSubModuleSample Configuration
- class: modules::pubsub::PubSubModule
config:
adapter:
class: modules::pubsub::LocalAdapterConfiguration
The adapter to use for pub/sub distribution. Defaults to modules::pubsub::LocalAdapter (in-memory) when not specified.
Adapters
modules::pubsub::LocalAdapter
In-memory pub/sub using broadcast channels. Messages are delivered only to subscribers running in the same engine process. No external dependencies required.
class: modules::pubsub::LocalAdaptermodules::pubsub::RedisAdapter
Uses Redis Pub/Sub as the backend. Enables event delivery across multiple engine instances.
class: modules::pubsub::RedisAdapter
config:
redis_url: ${REDIS_URL:redis://localhost:6379}Configuration
Functions
Publish an event to a topic. All functions subscribed to that topic will be invoked with the payload.
Trigger Type
This module adds a new Trigger Type: subscribe.
Subscribe Event Payload
The handler receives the raw value passed as data to the publish call. No envelope is added.
Sample Code
const fn = iii.registerFunction(
{ id: 'notifications::onOrderShipped' },
async (data) => {
console.log('Order shipped:', data)
return {}
},
)
iii.registerTrigger({
type: 'subscribe',
function_id: fn.id,
config: { topic: 'orders.shipped' },
})
await iii.trigger({
function_id: 'publish',
payload: {
topic: 'orders.shipped',
data: { orderId: 'abc-123', address: '123 Main St' },
},
action: TriggerAction.Void(),
})def on_order_shipped(data):
print('Order shipped:', data)
return {}
iii.register_function({'id': 'notifications::onOrderShipped'}, on_order_shipped)
iii.register_trigger({'type': 'subscribe', 'function_id': 'notifications::onOrderShipped', 'config': {'topic': 'orders.shipped'}})
iii.trigger({
'function_id': 'publish',
'payload': {
'topic': 'orders.shipped',
'data': {'orderId': 'abc-123', 'address': '123 Main St'},
},
})iii.register_function(
RegisterFunctionMessage { id: "notifications::onOrderShipped".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None },
|data| async move {
println!("Order shipped: {:?}", data);
Ok(json!({}))
},
);
iii.register_trigger(RegisterTriggerInput {
trigger_type: "subscribe".into(),
function_id: "notifications::onOrderShipped".into(),
config: json!({ "topic": "orders.shipped" }),
})?;
iii.trigger(TriggerRequest {
function_id: "publish".into(),
payload: json!({
"topic": "orders.shipped",
"data": { "orderId": "abc-123", "address": "123 Main St" }
}),
action: Some(TriggerAction::Void),
timeout_ms: None,
}).await?;Usage Example: Fanout Notification
One publisher triggers two independent subscribers on the same topic:
const emailFn = iii.registerFunction(
{ id: 'notifications::sendEmailAlert' },
async (data) => {
await sendEmail(data.userId, `Order ${data.orderId} shipped`)
return {}
},
)
const pushFn = iii.registerFunction(
{ id: 'notifications::sendPushAlert' },
async (data) => {
await sendPushNotification(data.userId, `Order ${data.orderId} shipped`)
return {}
},
)
iii.registerTrigger({
type: 'subscribe',
function_id: emailFn.id,
config: { topic: 'orders.shipped' },
})
iii.registerTrigger({
type: 'subscribe',
function_id: pushFn.id,
config: { topic: 'orders.shipped' },
})
await iii.trigger({
function_id: 'publish',
payload: {
topic: 'orders.shipped',
data: { orderId: 'abc-123', userId: 'user-456' },
},
})def send_email_alert(data):
send_email(data['userId'], f"Order {data['orderId']} shipped")
return {}
def send_push_alert(data):
send_push_notification(data['userId'], f"Order {data['orderId']} shipped")
return {}
iii.register_function({'id': 'notifications::sendEmailAlert'}, send_email_alert)
iii.register_function({'id': 'notifications::sendPushAlert'}, send_push_alert)
iii.register_trigger({'type': 'subscribe', 'function_id': 'notifications::sendEmailAlert', 'config': {'topic': 'orders.shipped'}})
iii.register_trigger({'type': 'subscribe', 'function_id': 'notifications::sendPushAlert', 'config': {'topic': 'orders.shipped'}})
iii.trigger({
'function_id': 'publish',
'payload': {
'topic': 'orders.shipped',
'data': {'orderId': 'abc-123', 'userId': 'user-456'},
},
})use iii_sdk::{RegisterFunctionMessage, RegisterTriggerInput, TriggerRequest};
use serde_json::json;
iii.register_function(
RegisterFunctionMessage { id: "notifications::sendEmailAlert".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None },
|data| async move {
let order_id = data["orderId"].as_str().unwrap_or("");
let user_id = data["userId"].as_str().unwrap_or("");
send_email(user_id, &format!("Order {} shipped", order_id)).await?;
Ok(json!({}))
},
);
iii.register_function(
RegisterFunctionMessage { id: "notifications::sendPushAlert".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None },
|data| async move {
let order_id = data["orderId"].as_str().unwrap_or("");
let user_id = data["userId"].as_str().unwrap_or("");
send_push_notification(user_id, &format!("Order {} shipped", order_id)).await?;
Ok(json!({}))
},
);
iii.register_trigger(RegisterTriggerInput {
trigger_type: "subscribe".into(),
function_id: "notifications::sendEmailAlert".into(),
config: json!({ "topic": "orders.shipped" }),
})?;
iii.register_trigger(RegisterTriggerInput {
trigger_type: "subscribe".into(),
function_id: "notifications::sendPushAlert".into(),
config: json!({ "topic": "orders.shipped" }),
})?;
iii.trigger(TriggerRequest {
function_id: "publish".into(),
payload: json!({
"topic": "orders.shipped",
"data": { "orderId": "abc-123", "userId": "user-456" }
}),
action: None,
timeout_ms: None,
}).await?;PubSub vs Queue
| Feature | PubSub | Queue |
|---|---|---|
| Delivery | Broadcast to all subscribers | Single consumer per message |
| Persistence | No (fire-and-forget) | Yes (with retries and DLQ) |
| Ordering | Not guaranteed | FIFO within topic |
| Best for | Real-time notifications, fanout | Reliable background processing |