iii

PubSub

Topic-based publish/subscribe messaging for real-time event distribution.

Topic-based publish/subscribe messaging for broadcasting events to multiple subscribers in real time.

modules::pubsub::PubSubModule

Sample Configuration

- class: modules::pubsub::PubSubModule
  config:
    adapter:
      class: modules::pubsub::LocalAdapter

Configuration

adapter
Adapter

The adapter to use for pub/sub distribution. Defaults to modules::pubsub::LocalAdapter (in-memory) when not specified.

Adapters

modules::pubsub::LocalAdapter

In-memory pub/sub using broadcast channels. Messages are delivered only to subscribers running in the same engine process. No external dependencies required.

class: modules::pubsub::LocalAdapter

modules::pubsub::RedisAdapter

Uses Redis Pub/Sub as the backend. Enables event delivery across multiple engine instances.

class: modules::pubsub::RedisAdapter
config:
  redis_url: ${REDIS_URL:redis://localhost:6379}

Configuration

redis_url
string

The URL of the Redis instance to use.

Functions

publish
function

Publish an event to a topic. All functions subscribed to that topic will be invoked with the payload.

Trigger Type

This module adds a new Trigger Type: subscribe.

Subscribe Event Payload

The handler receives the raw value passed as data to the publish call. No envelope is added.

payload
any

The exact value published to the topic. Shape is determined entirely by the publisher.

Sample Code

const fn = iii.registerFunction(
  { id: 'notifications::onOrderShipped' },
  async (data) => {
    console.log('Order shipped:', data)
    return {}
  },
)

iii.registerTrigger({
  type: 'subscribe',
  function_id: fn.id,
  config: { topic: 'orders.shipped' },
})

await iii.trigger({
  function_id: 'publish',
  payload: {
    topic: 'orders.shipped',
    data: { orderId: 'abc-123', address: '123 Main St' },
  },
  action: TriggerAction.Void(),
})
def on_order_shipped(data):
    print('Order shipped:', data)
    return {}

iii.register_function({'id': 'notifications::onOrderShipped'}, on_order_shipped)
iii.register_trigger({'type': 'subscribe', 'function_id': 'notifications::onOrderShipped', 'config': {'topic': 'orders.shipped'}})

iii.trigger({
    'function_id': 'publish',
    'payload': {
        'topic': 'orders.shipped',
        'data': {'orderId': 'abc-123', 'address': '123 Main St'},
    },
})
iii.register_function(
    RegisterFunctionMessage { id: "notifications::onOrderShipped".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None },
    |data| async move {
        println!("Order shipped: {:?}", data);
        Ok(json!({}))
    },
);

iii.register_trigger(RegisterTriggerInput {
    trigger_type: "subscribe".into(),
    function_id: "notifications::onOrderShipped".into(),
    config: json!({ "topic": "orders.shipped" }),
})?;

iii.trigger(TriggerRequest {
    function_id: "publish".into(),
    payload: json!({
        "topic": "orders.shipped",
        "data": { "orderId": "abc-123", "address": "123 Main St" }
    }),
    action: Some(TriggerAction::Void),
    timeout_ms: None,
}).await?;

Usage Example: Fanout Notification

One publisher triggers two independent subscribers on the same topic:

const emailFn = iii.registerFunction(
  { id: 'notifications::sendEmailAlert' },
  async (data) => {
    await sendEmail(data.userId, `Order ${data.orderId} shipped`)
    return {}
  },
)

const pushFn = iii.registerFunction(
  { id: 'notifications::sendPushAlert' },
  async (data) => {
    await sendPushNotification(data.userId, `Order ${data.orderId} shipped`)
    return {}
  },
)

iii.registerTrigger({
  type: 'subscribe',
  function_id: emailFn.id,
  config: { topic: 'orders.shipped' },
})

iii.registerTrigger({
  type: 'subscribe',
  function_id: pushFn.id,
  config: { topic: 'orders.shipped' },
})

await iii.trigger({
  function_id: 'publish',
  payload: {
    topic: 'orders.shipped',
    data: { orderId: 'abc-123', userId: 'user-456' },
  },
})
def send_email_alert(data):
    send_email(data['userId'], f"Order {data['orderId']} shipped")
    return {}

def send_push_alert(data):
    send_push_notification(data['userId'], f"Order {data['orderId']} shipped")
    return {}

iii.register_function({'id': 'notifications::sendEmailAlert'}, send_email_alert)
iii.register_function({'id': 'notifications::sendPushAlert'}, send_push_alert)

iii.register_trigger({'type': 'subscribe', 'function_id': 'notifications::sendEmailAlert', 'config': {'topic': 'orders.shipped'}})
iii.register_trigger({'type': 'subscribe', 'function_id': 'notifications::sendPushAlert', 'config': {'topic': 'orders.shipped'}})

iii.trigger({
    'function_id': 'publish',
    'payload': {
        'topic': 'orders.shipped',
        'data': {'orderId': 'abc-123', 'userId': 'user-456'},
    },
})
use iii_sdk::{RegisterFunctionMessage, RegisterTriggerInput, TriggerRequest};
use serde_json::json;

iii.register_function(
    RegisterFunctionMessage { id: "notifications::sendEmailAlert".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None },
    |data| async move {
        let order_id = data["orderId"].as_str().unwrap_or("");
        let user_id = data["userId"].as_str().unwrap_or("");
        send_email(user_id, &format!("Order {} shipped", order_id)).await?;
        Ok(json!({}))
    },
);

iii.register_function(
    RegisterFunctionMessage { id: "notifications::sendPushAlert".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None },
    |data| async move {
        let order_id = data["orderId"].as_str().unwrap_or("");
        let user_id = data["userId"].as_str().unwrap_or("");
        send_push_notification(user_id, &format!("Order {} shipped", order_id)).await?;
        Ok(json!({}))
    },
);

iii.register_trigger(RegisterTriggerInput {
    trigger_type: "subscribe".into(),
    function_id: "notifications::sendEmailAlert".into(),
    config: json!({ "topic": "orders.shipped" }),
})?;
iii.register_trigger(RegisterTriggerInput {
    trigger_type: "subscribe".into(),
    function_id: "notifications::sendPushAlert".into(),
    config: json!({ "topic": "orders.shipped" }),
})?;

iii.trigger(TriggerRequest {
    function_id: "publish".into(),
    payload: json!({
        "topic": "orders.shipped",
        "data": { "orderId": "abc-123", "userId": "user-456" }
    }),
    action: None,
    timeout_ms: None,
}).await?;

PubSub vs Queue

FeaturePubSubQueue
DeliveryBroadcast to all subscribersSingle consumer per message
PersistenceNo (fire-and-forget)Yes (with retries and DLQ)
OrderingNot guaranteedFIFO within topic
Best forReal-time notifications, fanoutReliable background processing

PubSub Flow

On this page