Skip to main content
In Chapter 3 every redirect triggers link::record_click directly, so the database write runs on the redirect’s hot path and a slow write slows the redirect. In this chapter you move that work onto a queue so redirects return immediately, then use pub/sub to broadcast link events to independent subscribers: a Python analytics worker and a cache refresher, both decoupled from the link worker.

Add the workers

This chapter uses two workers. iii-queue is already in your project from iii project init. Add iii-pubsub now; you publish your first event to it later in the chapter:
iii worker add iii-pubsub

Make redirects fast with a queue

A queue holds work that is accepted now and run later: each message in the clicks queue is one “insert a row recording that someone followed code X at time T.” Define the queue on the iii-queue worker (already in your project from iii project init), but requiring a few updates to queue_configs:
config.yaml
workers:
  # ...
  - name: iii-queue
    config:
      queue_configs:
        clicks:
          type: standard
          max_retries: 5
          concurrency: 5
      adapter:
        name: builtin
You already wrote link::record_click in Chapter 3, where http::redirect triggers it directly. Nothing about the function changes. You only change how it’s invoked. First import TriggerAction:
src/index.ts
import { registerWorker, Logger, TriggerAction } from "iii-sdk";
Then add an action to the existing link::record_click call in http::redirect so the iii-queue worker enqueues it instead of running it inline:
src/index.ts
// Enqueue the click and return right away; the consumer drains it later.
await worker.trigger({
  function_id: "link::record_click",
  payload: { code, clicked_at: new Date().toISOString() },
  action: TriggerAction.Enqueue({ queue: "clicks" }),
});
return { status_code: 302, headers: { Location: url } };
The redirect now returns as soon as the click is accepted onto the queue. link::record_click drains the queue in the background, with retries and a dead-letter queue if a write keeps failing.

Broadcast events with pub/sub

A queue delivers each message to one consumer. When several unrelated parts of the system need to react to the same event, use a publish subscribe design instead.
We ship both a iii-queue and iii-pubsub worker. While iii-queue provides standard queueing it also provides its own durable publish and subscribe.When you need a publish and subscribe flow to be guaranteed to succeed (or fail to a DLQ) then use iii-queues iii::durable::publish and durable:subscriber.When you don’t need a publish and subscribe flow to be guaranteed then use iii-pubsubs publish and subscribe.
Here we’ll implement topics that publish when a link is created, and when a link is updated. We don’t have link updating functionality yet, so we’ll add that and an HTTP endpoint for it too. Publish an event whenever a link is created or its target changes. Inside link::create, after the database write and state::set, trigger the built-in publish function:
src/index.ts
// ...inside link::create, after the database write and state::set:
await worker.trigger({
  function_id: "publish",
  payload: { topic: "link.created", data: { code, url } },
});
Add an update path so a link’s target can change, and announce it. First, the domain function: it updates the database row and publishes a link.updated event through durable pub/sub (iii::durable::publish, served by iii-queue):
src/index.ts
worker.registerFunction("link::update", async (payload: { code: string; url: string }) => {
  const url = /^https?:\/\//i.test(payload.url) ? payload.url : `https://${payload.url}`;
  await worker.trigger({
    function_id: "database::execute",
    payload: {
      db: DB,
      sql: "UPDATE links SET url = ? WHERE code = ?",
      params: [url, payload.code],
    },
  });
  await worker.trigger({
    function_id: "iii::durable::publish",
    payload: { topic: "link.updated", data: { code: payload.code, url } },
  });
  return { code: payload.code, url };
});
Then the HTTP handler that validates input and calls the domain function:
src/index.ts
worker.registerFunction("http::update", async (req) => {
  const code = req.path_params.code;
  const url = req.body?.url;
  if (!url) {
    return {
      status_code: 400,
      body: { error: 'missing "url"' },
      headers: { "Content-Type": "application/json" },
    };
  }
  const link = await worker.trigger<{ code: string; url: string }, { code: string; url: string }>({
    function_id: "link::update",
    payload: { code, url },
  });
  return { status_code: 200, body: link, headers: { "Content-Type": "application/json" } };
});
And the trigger that binds it to PUT /links/:code:
src/index.ts
worker.registerTrigger({
  type: "http",
  function_id: "http::update",
  config: { api_path: "/links/:code", http_method: "PUT" },
});

Add reactive state: Keep the cache correct without coupling

link::update changes the database but not the state cache, so a query could serve stale data. Rather than handle refreshing the state cache inside link::update, subscribe to the link.updated event with a durable subscriber:
Durable vs. regular pub/sub. link.updated uses durable pub/sub: iii::durable::publish with a durable:subscriber trigger, both served by the iii-queue worker. Consumers like this cache refresher must receive every update. A dropped event would leave the cache pointing at a stale URL. link.created stays on regular pub/sub (iii-pubsub). Its only consumer is a best-effort daily counter, so an occasional miss is harmless. Use durable pub/sub when a missed event would corrupt state, and regular pub/sub for fire-and-forget fan-out.
src/index.ts
worker.registerFunction("link::on_link_updated", async (data: { code: string; url: string }) => {
  await worker.trigger({
    function_id: "state::set",
    payload: { scope: "links", key: data.code, value: { url: data.url } },
  });
});

worker.registerTrigger({
  type: "durable:subscriber",
  function_id: "link::on_link_updated",
  config: { topic: "link.updated" },
});

Create an analytics worker in Python

Queues and events are useful within a single worker but also between workers. Thus far we’ve been writing all of our code in TypeScript. However workers are not restricted to specific languages or runtimes. So this time we’ll create an analytics worker in Python to count links.

Create a new worker

Scaffold a Python worker the same way you scaffolded the link worker in Chapter 1. That generates an analytics/ worker with a main.py example, and a iii.worker.yaml manifest.
iii worker init analytics --language python
Replace the example main.py with this one that subscribes to link.created events and keeps count of every time that a new short link is created:
analytics/main.py
import os
from datetime import datetime, timezone

from iii import register_worker, InitOptions, Logger

worker = register_worker(
    os.environ.get("III_URL", "ws://localhost:49134"),
    InitOptions(worker_name="analytics"),
)
logger = Logger()

DB = "analytics"

def ensure_schema() -> None:
    """The analytics worker owns its own table, in its own database."""
    worker.trigger(
        {
            "function_id": "database::execute",
            "payload": {
                "db": DB,
                "sql": "CREATE TABLE IF NOT EXISTS daily_link_counts (day TEXT PRIMARY KEY, count INTEGER NOT NULL)",
            },
        }
    )

def on_link_created(data: dict) -> dict:
    """Runs whenever link publishes `link.created`. Counts links per day."""
    day = datetime.now(timezone.utc).strftime("%Y-%m-%d")
    worker.trigger(
        {
            "function_id": "database::execute",
            "payload": {
                "db": DB,
                "sql": "INSERT INTO daily_link_counts (day, count) VALUES (?, 1) "
                "ON CONFLICT(day) DO UPDATE SET count = count + 1",
                "params": [day],
            },
        }
    )
    logger.info(f"counted new link {data.get('code')} for {day}")
    return {"ok": True}

ensure_schema()

worker.register_function("analytics::on_link_created", on_link_created)
worker.register_trigger(
    {
        "type": "subscribe",
        "function_id": "analytics::on_link_created",
        "config": {"topic": "link.created"},
    }
)

print("Analytics worker started")

Configure the worker’s manifest

The generated manifest has no run scripts yet, so give it an install and start script:
iii.worker.yaml
scripts:
  install: "pip install ."
  start: "watchfiles 'python main.py'"
Analytics keeps its counts in its own database, so the link worker never has to know it exists. Add an analytics database to the database worker, alongside the primary one from Chapter 3:
config.yaml
workers:
  # ...
  - name: database
    config:
      databases:
        primary:
          # ...
          url: sqlite:./data/iii.db
        analytics:
          url: sqlite:./data/analytics.db
Finally, add the new analytics worker to your config.yaml:
iii worker add ./analytics

See it work

Create five links, follow one a few times, and change its target:
# Make some new links
for n in $(seq 1 5); do
  curl -s -X POST http://127.0.0.1:3111/links \
    -H 'Content-Type: application/json' -d "{\"url\":\"https://iii.dev/$n\",\"code\":\"link$n\"}"
done
The Python worker keeps count of new link creations as expected:
iii trigger database::query db=analytics sql="SELECT day, count FROM daily_link_counts"
{ "rows": [{ "day": "2026-05-27", "count": 5 }], "row_count": 1 }

Conclusion

Redirects no longer wait on a database write: click rows ride a queue, drained in the background. Link events fan out over pub/sub to a Python analytics counter and a cache refresher, and the link worker does not know either of them exists. Next, in Ch. 5: Stream live clicks, you broadcast every click in real time from a dedicated click-streamer worker.