Skip to main content

Goal

Stream binary data between functions that may run in different worker processes. Channels give you Node.js-style readable and writable streams backed by WebSocket connections through the engine, so you can move large payloads incrementally instead of cramming everything into a single JSON message.

What Are Channels

Channels are a streaming primitive built into the iii engine. They let one function write data while another function reads it — in real time, across process and language boundaries. Every channel has two ends:
  • A writer — exposes a Writable stream for sending binary data.
  • A reader — exposes a Readable stream for receiving binary data.
Each end also has a ref — a small, JSON-serializable token (StreamChannelRef) that you embed inside a trigger() payload. When the receiving function deserializes the ref, the SDK automatically connects it to the engine and materializes a live ChannelWriter or ChannelReader. Function A creates a channel, sends readerRef to Function B through a normal trigger() call, then writes chunks to the writer stream. The engine pipes each chunk over WebSocket to Function B’s reader stream. When A ends the writer, B’s reader emits end.
For internal details on the WebSocket framing, backpressure handling, and lazy connection behavior, see the Channels architecture reference.

Why Channels Are Necessary

Function invocations in iii pass data as JSON messages. This works for structured payloads, but falls apart when you need to:
  • Transfer large binary data — files, images, PDFs, datasets. Serializing a 100 MB file as a JSON field is impractical and blows up memory.
  • Stream data incrementally — progress updates, partial results, or data that is produced over time. JSON messages are all-or-nothing.
  • Stream HTTP responses — serving file downloads, SSE streams, or chunked responses to HTTP clients requires writing data progressively to the response body.
  • Pipeline processing — chaining producer and consumer functions where the consumer starts processing before the producer finishes.
Channels solve all of these by giving each side a real stream backed by the engine’s WebSocket infrastructure.

When to Use Channels

ScenarioUse channels?Why
Serving a file download from an HTTP endpointYesStream the file to the HTTP response without buffering the entire file in memory
Processing a large CSV uploadYesRead the uploaded body as a stream and parse rows incrementally
Sending progress updates during a long-running taskYesUse sendMessage() on the writer for text-based progress alongside binary data
Piping data between a producer and consumer functionYesThe consumer can start working before the producer finishes
Returning a small JSON result from a functionNoA regular trigger() return value is simpler and sufficient
Passing a config object to another functionNoPut it in the trigger() payload directly
Fire-and-forget notificationsNoUse TriggerAction.Void() or a queue instead
Rule of thumb: if your data is small enough to fit comfortably in a JSON payload (< 1 MB) and you don’t need incremental delivery, use a regular trigger() call. Use channels when you need streaming, binary data, or when the payload is too large to serialize at once.

Steps

1

Create a channel

Call createChannel() on the SDK instance. This returns a channel object containing both local stream objects and their serializable refs.
const channel = await iii.createChannel()

// channel.writer    — ChannelWriter (local writable stream)
// channel.reader    — ChannelReader (local readable stream)
// channel.writerRef — StreamChannelRef (serializable, pass to another function)
// channel.readerRef — StreamChannelRef (serializable, pass to another function)
Creating a channel is cheap — the WebSocket connection is established lazily on first read or write.
2

Pass the ref to another function

Embed a ref (either readerRef or writerRef) inside the trigger() payload. The SDK on the receiving side automatically materializes it into a live ChannelReader or ChannelWriter.
const result = await iii.trigger({
  function_id: 'files::process',
  payload: { filename: 'report.csv', reader: channel.readerRef },
})
The ref is a plain JSON object with three fields — channel_id, access_key, and direction — so it survives serialization across languages and processes.
3

Write data to the channel

Use the writer’s stream to send binary data. Data is automatically chunked into 64 KB frames over the WebSocket.
channel.writer.stream.write(Buffer.from('first chunk'))
channel.writer.stream.write(Buffer.from('second chunk'))
channel.writer.stream.end() // signals completion
4

Read data from the channel

On the receiving side, iterate over the reader stream to consume chunks as they arrive.
const chunks: Buffer[] = []
for await (const chunk of input.reader.stream) {
  chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk))
}
const data = Buffer.concat(chunks)

Real-World Examples

Example 1: File Download via HTTP Endpoint

A common use case is serving file downloads from an HTTP endpoint. The http() helper gives you a response object with a writable stream — pipe data directly to it without buffering the entire file in memory.
import { registerWorker, http } from 'iii-sdk'
import type { HttpRequest, HttpResponse } from 'iii-sdk'
import * as fs from 'node:fs'
import { pipeline } from 'node:stream/promises'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction(
  { id: 'files::download' },
  http(async (req: HttpRequest, res: HttpResponse) => {
    const filePath = `/data/reports/${req.path_params.filename}`

    res.status(200)
    res.headers({
      'content-type': 'application/octet-stream',
      'content-disposition': `attachment; filename="${req.path_params.filename}"`,
    })

    await pipeline(fs.createReadStream(filePath), res.stream)
  }),
)

iii.registerTrigger({
  type: 'http',
  function_id: 'files::download',
  config: { api_path: 'files/:filename', http_method: 'GET' },
})
The http() wrapper in Node/TypeScript gives you direct access to the underlying channel writer as res.stream, so the file streams byte-by-byte from disk to the HTTP client without ever being fully buffered in memory.
curl -O http://localhost:3111/files/quarterly-report.pdf

Example 2: Server-Sent Events (SSE) Streaming

Channels power SSE endpoints where you push events to the client over time. Write each SSE frame to the response stream and the client receives them as they arrive.
import { registerWorker, http } from 'iii-sdk'
import type { HttpRequest, HttpResponse } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction(
  { id: 'events::stream' },
  http(async (_req: HttpRequest, res: HttpResponse) => {
    res.status(200)
    res.headers({
      'content-type': 'text/event-stream',
      'cache-control': 'no-cache',
      'connection': 'keep-alive',
    })

    for (let i = 1; i <= 5; i++) {
      const frame = `id: ${i}\nevent: progress\ndata: ${JSON.stringify({ step: i, total: 5 })}\n\n`
      res.stream.write(Buffer.from(frame))
      await new Promise((r) => setTimeout(r, 1000))
    }

    const done = `id: 6\nevent: done\ndata: ${JSON.stringify({ message: 'complete' })}\n\n`
    res.stream.write(Buffer.from(done))
    res.stream.end()
  }),
)

iii.registerTrigger({
  type: 'http',
  function_id: 'events::stream',
  config: { api_path: 'events/stream', http_method: 'GET' },
})
The client connects and receives events as they’re written:
curl -N http://localhost:3111/events/stream
# id: 1
# event: progress
# data: {"step":1,"total":5}
#
# id: 2
# event: progress
# data: {"step":2,"total":5}
# ...

Example 3: Streaming Data Between Functions

When one function produces data and another processes it, channels let the consumer start working before the producer finishes. This is the classic pipeline pattern.
import { registerWorker } from 'iii-sdk'
import type { ChannelReader } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction(
  { id: 'pipeline::consumer' },
  async (input: { label: string; reader: ChannelReader }) => {
    const chunks: Buffer[] = []
    for await (const chunk of input.reader.stream) {
      chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk))
    }

    const records = JSON.parse(Buffer.concat(chunks).toString('utf-8'))
    const total = records.reduce((sum: number, r: { value: number }) => sum + r.value, 0)

    return { label: input.label, count: records.length, total }
  },
)

iii.registerFunction(
  { id: 'pipeline::producer' },
  async (input: { records: { name: string; value: number }[] }) => {
    const channel = await iii.createChannel()

    const writePromise = new Promise<void>((resolve, reject) => {
      channel.writer.stream.end(
        Buffer.from(JSON.stringify(input.records)),
        (err?: Error | null) => (err ? reject(err) : resolve()),
      )
    })

    const result = await iii.trigger({
      function_id: 'pipeline::consumer',
      payload: { label: 'batch-001', reader: channel.readerRef },
    })

    await writePromise
    return result
  },
)

Example 4: Bidirectional Streaming with Progress

When two functions need to exchange data in both directions, create two channels. The writer’s sendMessage() method provides a side-channel for text-based progress or metadata that doesn’t mix with the binary stream.
import { registerWorker } from 'iii-sdk'
import type { ChannelReader, ChannelWriter } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction(
  { id: 'transform::worker' },
  async (input: { reader: ChannelReader; writer: ChannelWriter }) => {
    const chunks: Buffer[] = []
    let count = 0

    for await (const chunk of input.reader.stream) {
      chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk))
      count++
      input.writer.sendMessage(JSON.stringify({ type: 'progress', chunks: count }))
    }

    const result = Buffer.concat(chunks).toString('utf-8').toUpperCase()
    input.writer.stream.end(Buffer.from(result))

    return { status: 'done' }
  },
)

iii.registerFunction(
  { id: 'transform::coordinator' },
  async (input: { text: string }) => {
    const inputChannel = await iii.createChannel()
    const outputChannel = await iii.createChannel()

    const progress: unknown[] = []
    outputChannel.reader.onMessage((msg) => progress.push(JSON.parse(msg)))

    inputChannel.writer.stream.end(Buffer.from(input.text))

    const triggerPromise = iii.trigger({
      function_id: 'transform::worker',
      payload: {
        reader: inputChannel.readerRef,
        writer: outputChannel.writerRef,
      },
    })

    const resultChunks: Buffer[] = []
    for await (const chunk of outputChannel.reader.stream) {
      resultChunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk))
    }

    await triggerPromise
    return {
      result: Buffer.concat(resultChunks).toString('utf-8'),
      progress,
    }
  },
)
The coordinator creates two channels — one for input, one for output. The worker reads input, sends progress updates via sendMessage(), and writes the transformed result to the output channel. The coordinator collects both progress messages and the binary result.

API Quick Reference

createChannel()

Returns an object with four properties:
PropertyTypeDescription
writerChannelWriterLocal writable stream for sending data
readerChannelReaderLocal readable stream for receiving data
writerRefStreamChannelRefSerializable token — pass to another function so it can write
readerRefStreamChannelRefSerializable token — pass to another function so it can read

ChannelWriter

CapabilityNode / TypeScriptPythonRust
Write binary datawriter.stream.write(data)writer.write(data)writer.write(&data).await
Send text messagewriter.sendMessage(msg)writer.send_message(msg)writer.send_message(&msg).await
Closewriter.stream.end()writer.close()writer.close().await

ChannelReader

CapabilityNode / TypeScriptPythonRust
Read as streamfor await (const chunk of reader.stream)async for chunk in readerreader.next_binary().await
Read all at onceCollect chunks manuallyreader.read_all()reader.read_all().await
Listen for text messagesreader.onMessage(callback)reader.on_message(callback)reader.on_message(callback).await

StreamChannelRef

A plain JSON object that survives serialization:
type StreamChannelRef = {
  channel_id: string
  access_key: string
  direction: 'read' | 'write'
}

Common Pitfalls

If you forget to call writer.stream.end() (or writer.close() in Python/Rust), the reader will hang waiting for more data. Always end the writer when you’re done.
Only pass readerRef or writerRef (the serializable tokens) inside trigger() payloads. The local reader and writer objects are not serializable and cannot cross process boundaries.
If you write data faster than the reader consumes it, backpressure is handled automatically — the WebSocket pauses when the buffer is full. In Node.js, respect the false return from writer.stream.write() and wait for the drain event before writing more.

Next Steps

Channels Architecture

Internal design: WebSocket framing, backpressure, and lazy connections

Expose an HTTP Endpoint

Register a function as a REST API endpoint

Use Functions & Triggers

Learn how to register and trigger functions across languages

Stream Real-Time Data

Push real-time updates to connected clients