> ## Documentation Index
> Fetch the complete documentation index at: https://iii.dev/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Stream Real-Time Data

> How to push live updates to connected clients using the Stream worker.

## Goal

Push real-time updates (chat messages, notifications, live dashboards) to connected clients over WebSocket.

## Steps

### 1. Enable the Stream worker

```yaml title="iii-config.yaml" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
workers:
  - name: iii-stream
    config:
      port: ${STREAM_PORT:3112}
      host: 127.0.0.1
      adapter:
        name: kv
        config:
          store_method: file_based  # Options: in_memory, file_based
          file_path: ./data/stream_store  # required for file_based
        # name: redis
        # config:
        #   redis_url: redis://localhost:6379
```

### 2. Write to a stream

<Tabs>
  <Tab title="Node / TypeScript">
    ```typescript title="stream-writer.ts" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    import { registerWorker, Logger } from 'iii-sdk'

    const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

    iii.registerFunction('chat::send', async (input) => {
      const logger = new Logger()
      const messageId = crypto.randomUUID()

      await iii.trigger({
        function_id: 'stream::set',
        payload: {
          stream_name: 'chat',
          group_id: input.roomId,
          item_id: messageId,
          data: { text: input.text, author: input.author },
        },
      })

      logger.info('Message sent to stream', { messageId })
      return { messageId }
    })

    // Then call from another function or worker
    const { messageId } = await iii.trigger({
      function_id: 'chat::send',
      payload: { roomId: 'room-123', text: 'Hello world', author: 'alice' },
    })
    logger.info('Sent message', { messageId })
    ```
  </Tab>

  <Tab title="Python">
    ```python title="stream_writer.py" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    import os
    import uuid

    from iii import Logger, register_worker

    iii = register_worker(os.environ.get("III_URL", "ws://localhost:49134"))


    def send_message(input):
        logger = Logger()
        message_id = str(uuid.uuid4())

        iii.trigger({
            "function_id": "stream::set",
            "payload": {
                "stream_name": "chat",
                "group_id": input["roomId"],
                "item_id": message_id,
                "data": {"text": input["text"], "author": input["author"]},
            },
        })

        logger.info("Message sent to stream", {"messageId": message_id})
        return {"messageId": message_id}


    iii.register_function("chat::send", send_message)

    # Then call from another function or worker
    result = iii.trigger({
        "function_id": "chat::send",
        "payload": {"roomId": "room-123", "text": "Hello world", "author": "alice"},
    })
    print("Sent message:", result["messageId"])
    ```
  </Tab>

  <Tab title="Rust">
    ```rust title="stream_writer.rs" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    use iii_sdk::{register_worker, InitOptions, Logger, RegisterFunction, TriggerRequest};
    use serde_json::{json, Value};
    use tokio::signal;

    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
        let url = std::env::var("III_URL").unwrap_or_else(|_| "ws://127.0.0.1:49134".to_string());
        let iii = register_worker(&url, InitOptions::default());

        let iii_clone = iii.clone();
        let reg = RegisterFunction::new_async("chat::send", move |input: Value| {
            let iii = iii_clone.clone();
            async move {
                let logger = Logger::new();
                let message_id = uuid::Uuid::new_v4().to_string();
                let room_id = input["roomId"].as_str().unwrap_or("");
                let text = input["text"].as_str().unwrap_or("");
                let author = input["author"].as_str().unwrap_or("");

                iii.trigger(TriggerRequest {
                    function_id: "stream::set".into(),
                    payload: json!({
                        "stream_name": "chat",
                        "group_id": room_id,
                        "item_id": message_id,
                        "data": { "text": text, "author": author },
                    }),
                    action: None,
                    timeout_ms: None,
                })
                .await?;

                logger.info("Message sent to stream", Some(json!({ "messageId": message_id })));
                Ok(json!({ "messageId": message_id }))
            }
        });
        iii.register_function(reg);

        signal::ctrl_c().await?;
        Ok(())
    }

    // Then call from another function or worker
    let result = iii
        .trigger(TriggerRequest {
            function_id: "chat::send".into(),
            payload: json!({ "roomId": "room-123", "text": "Hello world", "author": "alice" }),
            action: None,
            timeout_ms: None,
        })
        .await?;
    println!("Sent message: {}", result["messageId"]);
    ```
  </Tab>
</Tabs>

### 3. Read from a stream

<Tabs>
  <Tab title="Node / TypeScript">
    ```typescript title="stream-reader.ts" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    iii.registerFunction('chat::list', async (input) => {
      const logger = new Logger()

      const messages = await iii.trigger({
        function_id: 'stream::list',
        payload: { stream_name: 'chat', group_id: input.roomId },
      })

      logger.info('Messages retrieved', { count: messages.length })
      return messages
    })

    // Then call from another function or worker
    const messages = await iii.trigger({
      function_id: 'chat::list',
      payload: { roomId: 'room-123' },
    })
    logger.info('Messages', { messages })
    ```
  </Tab>

  <Tab title="Python">
    ```python title="stream_reader.py" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    def list_messages(input):
        logger = Logger()

        messages = iii.trigger({
            "function_id": "stream::list",
            "payload": {"stream_name": "chat", "group_id": input["roomId"]},
        })

        logger.info("Messages retrieved", {"count": len(messages)})
        return messages


    iii.register_function("chat::list", list_messages)

    # Then call from another function or worker
    messages = iii.trigger({
        "function_id": "chat::list",
        "payload": {"roomId": "room-123"},
    })
    print("Messages:", messages)
    ```
  </Tab>

  <Tab title="Rust">
    ```rust title="stream_reader.rs" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    use iii_sdk::{RegisterFunction, TriggerRequest};
    use serde_json::{json, Value};

    let iii_clone = iii.clone();
    let reg = RegisterFunction::new_async("chat::list", move |input: Value| {
        let iii = iii_clone.clone();
        async move {
            let logger = Logger::new();
            let room_id = input["roomId"].as_str().unwrap_or("");

            let messages = iii
                .trigger(TriggerRequest {
                    function_id: "stream::list".into(),
                    payload: json!({ "stream_name": "chat", "group_id": room_id }),
                    action: None,
                    timeout_ms: None,
                })
                .await?;

            logger.info("Messages retrieved", Some(json!({ "count": messages.as_array().map(|a| a.len()).unwrap_or(0) })));
            Ok(messages)
        }
    });
    iii.register_function(reg);

    // Then call from another function or worker
    let messages = iii
        .trigger(TriggerRequest {
            function_id: "stream::list".into(),
            payload: json!({ "stream_name": "chat", "group_id": "room-123" }),
            action: None,
            timeout_ms: None,
        })
        .await?;
    println!("Messages: {:?}", messages);
    ```
  </Tab>
</Tabs>

### 4. Connect a client

Clients connect to the stream WebSocket endpoint to receive live updates:

```javascript title="client.js" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
const ws = new WebSocket('ws://127.0.0.1:3112/stream/chat/room-123')

ws.onmessage = (event) => {
  const update = JSON.parse(event.data)
  console.log('New message:', update)
}
```

### 5. React to stream events

Register triggers to run server-side logic when clients join or when stream data changes.

#### On client join

<Tabs>
  <Tab title="Node / TypeScript">
    ```typescript title="stream-triggers.ts" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    import { StreamJoinLeaveEvent } from 'iii-sdk/stream'

    const onJoin = iii.registerFunction('chat::onJoin', async (input: StreamJoinLeaveEvent) => {
      const logger = new Logger()
      logger.info('Client joined chat room', {
        stream: input.stream_name,
        group: input.group_id,
        context: input.context,
      })
      return {}
    })

    iii.registerTrigger({
      type: 'stream:join',
      function_id: onJoin.id,
      config: {},
    })
    ```
  </Tab>

  <Tab title="Python">
    ```python title="stream_triggers.py" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    from iii import StreamJoinLeaveEvent

    def on_join(input: StreamJoinLeaveEvent):
        logger = Logger()
        logger.info("Client joined chat room", {
            "stream": input.stream_name,
            "group": input.group_id,
            "context": input.context,
        })
        return {}

    iii.register_function("chat::onJoin", on_join)
    iii.register_trigger({"type": "stream:join", "function_id": "chat::onJoin", "config": {}})
    ```
  </Tab>

  <Tab title="Rust">
    ```rust title="stream_triggers.rs" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    use iii_sdk::{IIITrigger, StreamJoinLeaveCallRequest, StreamJoinLeaveTriggerConfig};

    let reg = RegisterFunction::new_async("chat::onJoin", move |input: Value| {
        async move {
            let logger = Logger::new();
            let event: StreamJoinLeaveCallRequest = serde_json::from_value(input)?;
            logger.info("Client joined chat room", Some(json!({
                "stream": event.stream_name,
                "group": event.group_id,
                "context": event.context,
            })));
            Ok(json!({}))
        }
    });
    iii.register_function(reg);

    iii.register_trigger(
        IIITrigger::StreamJoin(StreamJoinLeaveTriggerConfig::new()).for_function("chat::onJoin"),
    )?;
    ```
  </Tab>
</Tabs>

#### On message created

<Tabs>
  <Tab title="Node / TypeScript">
    ```typescript title="stream-triggers.ts" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    import { StreamChangeEvent } from 'iii-sdk/stream'

    const onMessage = iii.registerFunction('chat::onMessage', async (input: StreamChangeEvent) => {
      const logger = new Logger()

      if (input.event.type === 'create') {
        logger.info('New message in room', { room: input.groupId, messageId: input.id, data: input.event.data })
      }

      return {}
    })

    iii.registerTrigger({
      type: 'stream',
      function_id: onMessage.id,
      config: { stream_name: 'chat' },
    })
    ```
  </Tab>

  <Tab title="Python">
    ```python title="stream_triggers.py" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    from iii import StreamChangeEvent

    def on_message(input: StreamChangeEvent):
        logger = Logger()

        if input.event.type == "create":
            logger.info("New message in room", {
                "room": input.groupId,
                "messageId": input.id,
                "data": input.event.data,
            })

        return {}

    iii.register_function("chat::onMessage", on_message)
    iii.register_trigger({
        "type": "stream",
        "function_id": "chat::onMessage",
        "config": {"stream_name": "chat"},
    })
    ```
  </Tab>

  <Tab title="Rust">
    ```rust title="stream_triggers.rs" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    use iii_sdk::{IIITrigger, StreamCallRequest, StreamEventType, StreamTriggerConfig};

    let reg = RegisterFunction::new_async("chat::onMessage", move |input: Value| {
        async move {
            let logger = Logger::new();
            let event: StreamCallRequest = serde_json::from_value(input)?;

            if event.event.event_type == StreamEventType::Create {
                logger.info("New message in room", Some(json!({
                    "room": event.group_id,
                    "messageId": event.id,
                    "data": event.event.data,
                })));
            }

            Ok(json!({}))
        }
    });
    iii.register_function(reg);

    iii.register_trigger(
        IIITrigger::Stream(StreamTriggerConfig::new().stream_name("chat")).for_function("chat::onMessage"),
    )?;
    ```
  </Tab>
</Tabs>

## Result

Any data written to the stream via `stream::set` is immediately pushed to all connected WebSocket clients subscribed to that stream and group. Server-side triggers let you react to joins, leaves, and data changes without polling.

<Info title="See also">
  For all stream operations, trigger payload shapes, and authentication, see the [Stream worker reference](../workers/iii-stream).
</Info>
