iii

Stream Real-Time Data

How to push live updates to connected clients using the Stream module.

Goal

Push real-time updates (chat messages, notifications, live dashboards) to connected clients over WebSocket.

Steps

1. Enable the Stream module

iii-config.yaml
modules:
  - class: modules::stream::StreamModule
    config:
      port: ${STREAM_PORT:3112}
      host: localhost
      adapter:
        class: modules::stream::adapters::KvStore
        config:
          store_method: file_based  # Options: in_memory, file_based
          file_path: ./data/stream_store  # required for file_based
        # class: modules::stream::adapters::RedisAdapter
        # config:
        #   redis_url: redis://localhost:6379

2. Write to a stream

stream-writer.ts
import { registerWorker, Logger } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction({ id: 'chat::send' }, async (input) => {
  const logger = new Logger()
  const messageId = crypto.randomUUID()

  await iii.trigger({
    function_id: 'stream::set',
    payload: {
      stream_name: 'chat',
      group_id: input.roomId,
      item_id: messageId,
      data: { text: input.text, author: input.author },
    },
  })

  logger.info('Message sent to stream', { messageId })
  return { messageId }
})

// Then call from another function or worker
const { messageId } = await iii.trigger({
  function_id: 'chat::send',
  payload: { roomId: 'room-123', text: 'Hello world', author: 'alice' },
})
logger.info('Sent message', { messageId })
stream_writer.py
import os
import uuid

from iii import Logger, register_worker

iii = register_worker(os.environ.get("III_URL", "ws://localhost:49134"))


def send_message(input):
    logger = Logger()
    message_id = str(uuid.uuid4())

    iii.trigger({
        "function_id": "stream::set",
        "payload": {
            "stream_name": "chat",
            "group_id": input["roomId"],
            "item_id": message_id,
            "data": {"text": input["text"], "author": input["author"]},
        },
    })

    logger.info("Message sent to stream", {"messageId": message_id})
    return {"messageId": message_id}


iii.register_function({"id": "chat::send"}, send_message)

# Then call from another function or worker
result = iii.trigger({
    "function_id": "chat::send",
    "payload": {"roomId": "room-123", "text": "Hello world", "author": "alice"},
})
print("Sent message:", result["messageId"])
stream_writer.rs
use iii_sdk::{register_worker, InitOptions, Logger, RegisterFunctionMessage, TriggerRequest};
use serde_json::{json, Value};
use tokio::signal;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let url = std::env::var("III_URL").unwrap_or_else(|_| "ws://127.0.0.1:49134".to_string());
    let iii = register_worker(&url, InitOptions::default());

    let iii_clone = iii.clone();
    iii.register_function(
        RegisterFunctionMessage {
            id: "chat::send".into(),
            description: None,
            request_format: None,
            response_format: None,
            metadata: None,
            invocation: None,
        },
        move |input: Value| {
            let iii = iii_clone.clone();
            async move {
                let logger = Logger::new();
                let message_id = uuid::Uuid::new_v4().to_string();
                let room_id = input["roomId"].as_str().unwrap_or("");
                let text = input["text"].as_str().unwrap_or("");
                let author = input["author"].as_str().unwrap_or("");

                iii.trigger(TriggerRequest {
                    function_id: "stream::set".into(),
                    payload: json!({
                        "stream_name": "chat",
                        "group_id": room_id,
                        "item_id": message_id,
                        "data": { "text": text, "author": author },
                    }),
                    action: None,
                    timeout_ms: None,
                })
                .await?;

                logger.info("Message sent to stream", Some(json!({ "messageId": message_id })));
                Ok(json!({ "messageId": message_id }))
            }
        },
    );

    signal::ctrl_c().await?;
    Ok(())
}

// Then call from another function or worker
let result = iii
    .trigger(TriggerRequest {
        function_id: "chat::send".into(),
        payload: json!({ "roomId": "room-123", "text": "Hello world", "author": "alice" }),
        action: None,
        timeout_ms: None,
    })
    .await?;
println!("Sent message: {}", result["messageId"]);

3. Read from a stream

stream-reader.ts
iii.registerFunction({ id: 'chat::list' }, async (input) => {
  const logger = new Logger()

  const messages = await iii.trigger({
    function_id: 'stream::list',
    payload: { stream_name: 'chat', group_id: input.roomId },
  })

  logger.info('Messages retrieved', { count: messages.length })
  return messages
})

// Then call from another function or worker
const messages = await iii.trigger({
  function_id: 'chat::list',
  payload: { roomId: 'room-123' },
})
logger.info('Messages', { messages })
stream_reader.py
def list_messages(input):
    logger = Logger()

    messages = iii.trigger({
        "function_id": "stream::list",
        "payload": {"stream_name": "chat", "group_id": input["roomId"]},
    })

    logger.info("Messages retrieved", {"count": len(messages)})
    return messages


iii.register_function({"id": "chat::list"}, list_messages)

# Then call from another function or worker
messages = iii.trigger({
    "function_id": "chat::list",
    "payload": {"roomId": "room-123"},
})
print("Messages:", messages)
stream_reader.rs
let iii_clone = iii.clone();
iii.register_function(
    RegisterFunctionMessage {
        id: "chat::list".into(),
        description: None,
        request_format: None,
        response_format: None,
        metadata: None,
        invocation: None,
    },
    move |input: Value| {
        let iii = iii_clone.clone();
        async move {
            let logger = Logger::new();
            let room_id = input["roomId"].as_str().unwrap_or("");

            let messages = iii
                .trigger(TriggerRequest {
                    function_id: "stream::list".into(),
                    payload: json!({ "stream_name": "chat", "group_id": room_id }),
                    action: None,
                    timeout_ms: None,
                })
                .await?;

            logger.info("Messages retrieved", Some(json!({ "count": messages.as_array().map(|a| a.len()).unwrap_or(0) })));
            Ok(messages)
        }
    },
);

// Then call from another function or worker
let messages = iii
    .trigger(TriggerRequest {
        function_id: "chat::list".into(),
        payload: json!({ "roomId": "room-123" }),
        action: None,
        timeout_ms: None,
    })
    .await?;
println!("Messages: {:?}", messages);

4. Connect a client

Clients connect to the stream WebSocket endpoint to receive live updates:

client.js
const ws = new WebSocket('ws://localhost:3112/stream/chat/room-123')

ws.onmessage = (event) => {
  const update = JSON.parse(event.data)
  console.log('New message:', update)
}

Result

Any data written to the stream via stream::set is immediately pushed to all connected WebSocket clients subscribed to that stream and group. No polling needed.

On this page