How to stream binary data between functions using channels — including file transfers, HTTP streaming responses, progress reporting, and bidirectional communication.
Stream binary data between functions that may run in different worker processes. Channels give you Node.js-style readable and writable streams backed by WebSocket connections through the engine, so you can move large payloads incrementally instead of cramming everything into a single JSON message.
Channels are a streaming primitive built into the iii engine. They let one function write data while another function reads it — in real time, across process and language boundaries.Every channel has two ends:
A writer — exposes a Writable stream for sending binary data.
A reader — exposes a Readable stream for receiving binary data.
Each end also has a ref — a small, JSON-serializable token (StreamChannelRef) that you embed inside a trigger() payload. When the receiving function deserializes the ref, the SDK automatically connects it to the engine and materializes a live ChannelWriter or ChannelReader.Function A creates a channel, sends readerRef to Function B through a normal trigger() call, then writes chunks to the writer stream. The engine pipes each chunk over WebSocket to Function B’s reader stream. When A ends the writer, B’s reader emits end.
For internal details on the WebSocket framing, backpressure handling, and lazy connection behavior, see the Channels architecture reference.
Function invocations in iii pass data as JSON messages. This works for structured payloads, but falls apart when you need to:
Transfer large binary data — files, images, PDFs, datasets. Serializing a 100 MB file as a JSON field is impractical and blows up memory.
Stream data incrementally — progress updates, partial results, or data that is produced over time. JSON messages are all-or-nothing.
Stream HTTP responses — serving file downloads, SSE streams, or chunked responses to HTTP clients requires writing data progressively to the response body.
Pipeline processing — chaining producer and consumer functions where the consumer starts processing before the producer finishes.
Channels solve all of these by giving each side a real stream backed by the engine’s WebSocket infrastructure.
Stream the file to the HTTP response without buffering the entire file in memory
Processing a large CSV upload
Yes
Read the uploaded body as a stream and parse rows incrementally
Sending progress updates during a long-running task
Yes
Use sendMessage() on the writer for text-based progress alongside binary data
Piping data between a producer and consumer function
Yes
The consumer can start working before the producer finishes
Returning a small JSON result from a function
No
A regular trigger() return value is simpler and sufficient
Passing a config object to another function
No
Put it in the trigger() payload directly
Fire-and-forget notifications
No
Use TriggerAction.Void() or a queue instead
Rule of thumb: if your data is small enough to fit comfortably in a JSON payload (< 1 MB) and you don’t need incremental delivery, use a regular trigger() call. Use channels when you need streaming, binary data, or when the payload is too large to serialize at once.
Creating a channel is cheap — the WebSocket connection is established lazily on first read or write.
2
Pass the ref to another function
Embed a ref (either readerRef or writerRef) inside the trigger() payload. The SDK on the receiving side automatically materializes it into a live ChannelReader or ChannelWriter.
let result = iii.trigger(TriggerRequest::new("files::process", json!({ "filename": "report.csv", "reader": channel.reader_ref,}))).await?;
The ref is a plain JSON object with three fields — channel_id, access_key, and direction — so it survives serialization across languages and processes.
3
Write data to the channel
Use the writer’s stream to send binary data. Data is automatically chunked into 64 KB frames over the WebSocket.
A common use case is serving file downloads from an HTTP endpoint. The http() helper gives you a response object with a writable stream — pipe data directly to it without buffering the entire file in memory.
Node / TypeScript
Python
Copy
Ask AI
import { registerWorker, http } from 'iii-sdk'import type { HttpRequest, HttpResponse } from 'iii-sdk'import * as fs from 'node:fs'import { pipeline } from 'node:stream/promises'const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')iii.registerFunction( { id: 'files::download' }, http(async (req: HttpRequest, res: HttpResponse) => { const filePath = `/data/reports/${req.path_params.filename}` res.status(200) res.headers({ 'content-type': 'application/octet-stream', 'content-disposition': `attachment; filename="${req.path_params.filename}"`, }) await pipeline(fs.createReadStream(filePath), res.stream) }),)iii.registerTrigger({ type: 'http', function_id: 'files::download', config: { api_path: 'files/:filename', http_method: 'GET' },})
The http() wrapper in Node/TypeScript gives you direct access to the underlying channel writer as res.stream, so the file streams byte-by-byte from disk to the HTTP client without ever being fully buffered in memory.
Channels power SSE endpoints where you push events to the client over time. Write each SSE frame to the response stream and the client receives them as they arrive.
When one function produces data and another processes it, channels let the consumer start working before the producer finishes. This is the classic pipeline pattern.
Node / TypeScript
Python
Rust
Copy
Ask AI
import { registerWorker } from 'iii-sdk'import type { ChannelReader } from 'iii-sdk'const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')iii.registerFunction( { id: 'pipeline::consumer' }, async (input: { label: string; reader: ChannelReader }) => { const chunks: Buffer[] = [] for await (const chunk of input.reader.stream) { chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)) } const records = JSON.parse(Buffer.concat(chunks).toString('utf-8')) const total = records.reduce((sum: number, r: { value: number }) => sum + r.value, 0) return { label: input.label, count: records.length, total } },)iii.registerFunction( { id: 'pipeline::producer' }, async (input: { records: { name: string; value: number }[] }) => { const channel = await iii.createChannel() const writePromise = new Promise<void>((resolve, reject) => { channel.writer.stream.end( Buffer.from(JSON.stringify(input.records)), (err?: Error | null) => (err ? reject(err) : resolve()), ) }) const result = await iii.trigger({ function_id: 'pipeline::consumer', payload: { label: 'batch-001', reader: channel.readerRef }, }) await writePromise return result },)
Copy
Ask AI
import jsonfrom iii import register_workerfrom iii.channels import ChannelReaderiii_client = register_worker("ws://localhost:49134")def consumer_handler(input_data): reader: ChannelReader = input_data["reader"] raw = reader.read_all() records = json.loads(raw.decode("utf-8")) total = sum(r["value"] for r in records) return { "label": input_data["label"], "count": len(records), "total": total, }def producer_handler(input_data): records = input_data["records"] channel = iii_client.create_channel() payload = json.dumps(records).encode("utf-8") channel.writer.write(payload) channel.writer.close() result = iii_client.trigger({ "function_id": "pipeline::consumer", "payload": { "label": "batch-001", "reader": channel.reader_ref.model_dump(), }, }) return resultiii_client.register_function({"id": "pipeline::consumer"}, consumer_handler)iii_client.register_function({"id": "pipeline::producer"}, producer_handler)
Copy
Ask AI
use iii_sdk::{III, IIIError, ChannelReader, extract_channel_refs, ChannelDirection, RegisterFunctionMessage, TriggerRequest};use serde_json::{json, Value};let iii_for_consumer = iii.clone();iii.register_function(RegisterFunctionMessage { id: "pipeline::consumer".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, move |input: Value| { let iii = iii_for_consumer.clone(); async move { let label = input["label"].as_str().unwrap_or_default().to_string(); let refs = extract_channel_refs(&input); let reader_ref = refs.iter() .find(|(k, r)| k == "reader" && matches!(r.direction, ChannelDirection::Read)) .map(|(_, r)| r.clone()) .expect("missing reader channel ref"); let reader = ChannelReader::new(iii.address(), &reader_ref); let raw = reader.read_all().await .map_err(|e| IIIError::Handler(e.to_string()))?; let records: Vec<Value> = serde_json::from_slice(&raw) .map_err(|e| IIIError::Handler(e.to_string()))?; let total: f64 = records.iter() .filter_map(|r| r["value"].as_f64()) .sum(); Ok(json!({ "label": label, "count": records.len(), "total": total })) }});let iii_for_producer = iii.clone();iii.register_function(RegisterFunctionMessage { id: "pipeline::producer".into(), description: None, request_format: None, response_format: None, metadata: None, invocation: None }, move |input: Value| { let iii = iii_for_producer.clone(); async move { let records = input["records"].clone(); let channel = iii.create_channel(None).await .map_err(|e| IIIError::Handler(e.to_string()))?; let payload = serde_json::to_vec(&records) .map_err(|e| IIIError::Handler(e.to_string()))?; channel.writer.write(&payload).await .map_err(|e| IIIError::Handler(e.to_string()))?; channel.writer.close().await .map_err(|e| IIIError::Handler(e.to_string()))?; let result = iii.trigger(TriggerRequest { function_id: "pipeline::consumer".to_string(), payload: json!({ "label": "batch-001", "reader": channel.reader_ref, }), action: None, timeout_ms: Some(30000), }).await.map_err(|e| IIIError::Handler(e.to_string()))?; Ok(result) }});
When two functions need to exchange data in both directions, create two channels. The writer’s sendMessage() method provides a side-channel for text-based progress or metadata that doesn’t mix with the binary stream.
The coordinator creates two channels — one for input, one for output. The worker reads input, sends progress updates via sendMessage(), and writes the transformed result to the output channel. The coordinator collects both progress messages and the binary result.
If you forget to call writer.stream.end() (or writer.close() in Python/Rust), the reader will hang waiting for more data. Always end the writer when you’re done.
Only pass readerRef or writerRef (the serializable tokens) inside trigger() payloads. The local reader and writer objects are not serializable and cannot cross process boundaries.
If you write data faster than the reader consumes it, backpressure is handled automatically — the WebSocket pauses when the buffer is full. In Node.js, respect the false return from writer.stream.write() and wait for the drain event before writing more.