Skip to main content
iii-stream is for real-time data transmission: pushing data to a client the moment it changes, like a live feed of clicks for a dashboard. A stream is bidirectional (subscribers can send messages back as well as receive them), but here you only need to broadcast clicks outward. You’ll move the live-broadcast concern into its own click-streamer worker so the link worker stays focused on links.

Add the workers

iii-stream is how we will send clicks to clients in Chapter 7. We’ll make a new click-streamer worker to manage the streaming, so scaffold it the same way you scaffolded link in Chapter 1, and analytics in Chapter 4:
iii worker add iii-stream
iii worker init click-streamer --language typescript

Broadcast clicks in real time

We’ll continue to keep link decoupled by having it announce that a click happened, and click-streamer reacts by pushing it onto the live feed. A live counter can tolerate the rare dropped event, so a regular iii-pubsub event is the right tool here. First, publish a link.clicked event from link::record_click:
link/src/index.ts
worker.registerFunction(
  "link::record_click",
  async (payload: { code: string; clicked_at: string }) => {
    await worker.trigger({
      function_id: "database::execute",
      payload: {
        db: DB,
        sql: "INSERT INTO clicks (code, clicked_at) VALUES (?, ?)",
        params: [payload.code, payload.clicked_at],
      },
    });
    worker.trigger({
      function_id: "publish",
      payload: { topic: "link.clicked", data: payload },
      action: TriggerAction.Void(),
    });
    return { recorded: true };
  },
);
In the new code above we didn’t use await and set the action to TriggerAction.Void(). This causes the function to return immediately before it completes. This is a simple performance enhancement with things like pubsub where we don’t need guaranteed execution.

Setup the click-streamer worker

Now write the click-streamer worker. It subscribes to link.clicked and broadcasts each click to a clicks stream with stream::set. A stream::set both stores the item and pushes it to every WebSocket subscribed to that stream and group. Replace the generated click-streamer/src/index.ts:
click-streamer/src/index.ts
import { registerWorker, Logger } from "iii-sdk";

const worker = registerWorker(process.env.III_URL ?? "ws://localhost:49134", {
  workerName: "click-streamer",
});
const logger = new Logger();

worker.registerFunction(
  "click-streamer::broadcast",
  async (data: { code: string; clicked_at: string }) => {
    await worker.trigger({
      function_id: "stream::set",
      payload: {
        stream_name: "clicks",
        group_id: "all",
        item_id: `${data.code}-${data.clicked_at}`,
        data,
      },
    });
    return { streamed: true };
  },
);

worker.registerTrigger({
  type: "subscribe",
  function_id: "click-streamer::broadcast",
  config: { topic: "link.clicked" },
});

logger.info("click-streamer ready");
Register it with your project:
iii worker add ./click-streamer
The browser you build in Chapter 7 subscribes to clicks/all and counts those broadcasts live.

See it work

With the engine running, follow a link a few times and read the live clicks stream:
curl -s -X POST http://127.0.0.1:3111/links \
  -H 'Content-Type: application/json' -d '{"url":"https://iii.dev","code":"stream-me"}'
for n in $(seq 1 3); do curl -s -o /dev/null http://127.0.0.1:3111/s/stream-me; done
iii trigger stream::list stream_name=clicks group_id=all
Each redirect lands in the stream as the click-streamer worker broadcasts it.

Conclusion

Linkly now streams every click to subscribers in real time through a dedicated click-streamer worker. Next, in Ch. 6: Move bulk data with channels, you bulk-load links from a CSV in a single streamed upload.