Skip to main content

Goal

Push real-time updates (chat messages, notifications, live dashboards) to connected clients over WebSocket.

Steps

1. Enable the Stream module

iii-config.yaml
modules:
  - class: modules::stream::StreamModule
    config:
      port: ${STREAM_PORT:3112}
      host: localhost
      adapter:
        class: modules::stream::adapters::KvStore
        config:
          store_method: file_based  # Options: in_memory, file_based
          file_path: ./data/stream_store  # required for file_based
        # class: modules::stream::adapters::RedisAdapter
        # config:
        #   redis_url: redis://localhost:6379

2. Write to a stream

stream-writer.ts
import { registerWorker, Logger } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction({ id: 'chat::send' }, async (input) => {
  const logger = new Logger()
  const messageId = crypto.randomUUID()

  await iii.trigger({
    function_id: 'stream::set',
    payload: {
      stream_name: 'chat',
      group_id: input.roomId,
      item_id: messageId,
      data: { text: input.text, author: input.author },
    },
  })

  logger.info('Message sent to stream', { messageId })
  return { messageId }
})

// Then call from another function or worker
const { messageId } = await iii.trigger({
  function_id: 'chat::send',
  payload: { roomId: 'room-123', text: 'Hello world', author: 'alice' },
})
logger.info('Sent message', { messageId })

3. Read from a stream

stream-reader.ts
iii.registerFunction({ id: 'chat::list' }, async (input) => {
  const logger = new Logger()

  const messages = await iii.trigger({
    function_id: 'stream::list',
    payload: { stream_name: 'chat', group_id: input.roomId },
  })

  logger.info('Messages retrieved', { count: messages.length })
  return messages
})

// Then call from another function or worker
const messages = await iii.trigger({
  function_id: 'chat::list',
  payload: { roomId: 'room-123' },
})
logger.info('Messages', { messages })

4. Connect a client

Clients connect to the stream WebSocket endpoint to receive live updates:
client.js
const ws = new WebSocket('ws://localhost:3112/stream/chat/room-123')

ws.onmessage = (event) => {
  const update = JSON.parse(event.data)
  console.log('New message:', update)
}

Result

Any data written to the stream via stream::set is immediately pushed to all connected WebSocket clients subscribed to that stream and group. No polling needed.