> ## Documentation Index
> Fetch the complete documentation index at: https://iii.dev/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Developing Custom Workers

> Extend the iii engine by building custom workers that register functions, triggers, and integrate with external systems.

Custom workers in the iii Engine allow developers to extend the core functionality of the system. A worker acts as a container for logic that can register functions, triggers, and integrate with external systems.

## Introduction

Workers are dynamically loaded and configured, often utilizing an Adapter pattern to allow for swappable backend implementations (e.g., swapping an in-memory event bus for a Redis-backed one).

The engine provides a trait-based system where workers implement the `CoreWorker` trait for lifecycle management and the `ConfigurableWorker` trait for handling configuration and adapter injection.

```mermaid theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
graph TD
    Engine[iii Engine] -->|Loads| Worker[Custom Worker]
    Worker -->|Uses| Adapter[Adapter Interface]
    Adapter -.->|implements| Redis[Redis Adapter]
    Adapter -.->|implements| Memory[In-Memory Adapter]
    Adapter -.->|implements| Custom[Custom Adapter]

    Worker -->|Registers| Functions[Functions]
    Worker -->|Registers| Triggers[Triggers]
```

## Worker Architecture

The worker system is built around two primary traits: `CoreWorker` and `ConfigurableWorker`.

### Core Traits

| Trait                  | Description                                                                                   | Key Methods                                                                                           |
| ---------------------- | --------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------- |
| **CoreWorker**         | The base trait for all workers. Handles lifecycle, identification, and function registration. | `name()`, `create()`, `initialize()`, `register_functions()`, `start_background_tasks()`, `destroy()` |
| **ConfigurableWorker** | Extends `CoreWorker` to support typed configuration and pluggable adapters.                   | `build()`, `registry()`, `adapter_name_from_config()`                                                 |

### Lifecycle Flow

The following diagram illustrates the lifecycle of a worker from creation to initialization.

```mermaid theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
sequenceDiagram
    participant Engine
    participant Builder
    participant Worker
    participant Adapter

    Note over Builder, Worker: Worker Registration Phase
    Builder->>Worker: create(engine, config)

    alt is ConfigurableWorker
        Worker->>Worker: create_with_adapters(engine, config)
        Worker->>Adapter: factory(engine, config)
        Adapter-->>Worker: Arc<AdapterInstance>
        Worker->>Worker: build(engine, config, adapter)
    end

    Builder->>Worker: initialize()
    activate Worker
    Worker->>Engine: register_trigger_type() (Optional)
    Worker-->>Builder: Result
    deactivate Worker

    Builder->>Worker: register_functions(engine)
    activate Worker
    Worker->>Engine: register_function()
    deactivate Worker
```

## Implementing a Configurable Worker

Developing a custom worker typically involves defining an adapter interface, implementing specific adapters, and then wrapping them in a worker structure.

### Step 1: Define the Adapter Trait

Define an `async_trait` that specifies the behavior your worker's backend must implement. This allows users to switch implementations via configuration.

```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
use async_trait::async_trait;
use serde_json::Value;

#[async_trait]
pub trait CustomEventAdapter: Send + Sync + 'static {
    async fn emit(&self, topic: &str, event_data: Value);
    async fn subscribe(&self, topic: &str, id: &str, function_id: &str);
    async fn unsubscribe(&self, topic: &str, id: &str);
}
```

**Why async\_trait?** Rust's async traits require this macro to handle the complexity of async function pointers.

### Step 2: Implement Adapter Registration

To make adapters discoverable by the configuration system, you must define a registration struct and use the `inventory` crate.

```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
use std::sync::Arc;
use std::future::Future;
use std::pin::Pin;
use iii::Engine;

pub type CustomEventAdapterFuture = Pin<
    Box<dyn Future<Output = anyhow::Result<Arc<dyn CustomEventAdapter>>> + Send>
>;

pub struct CustomEventAdapterRegistration {
    pub name: &'static str,
    pub factory: fn(Arc<Engine>, Option<Value>) -> CustomEventAdapterFuture,
}

// Implement AdapterRegistrationEntry trait
impl AdapterRegistrationEntry<dyn CustomEventAdapter> for CustomEventAdapterRegistration {
    fn name(&self) -> &'static str {
        self.name
    }

    fn factory(&self) -> fn(Arc<Engine>, Option<Value>) -> CustomEventAdapterFuture {
        self.factory
    }
}

// Register the type with inventory
inventory::collect!(CustomEventAdapterRegistration);
```

**Purpose**: This registration system allows the engine to discover and instantiate adapters dynamically based on configuration.

### Step 3: Create Adapter Factories

Define factory functions that instantiate your specific adapter implementations (e.g., `InMemory` or `Logging`).

```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
use iii::register_adapter;

fn make_inmemory_adapter(
    engine: Arc<Engine>,
    config: Option<Value>
) -> CustomEventAdapterFuture {
    Box::pin(async move {
        Ok(Arc::new(InMemoryEventAdapter::new(config, engine).await?)
            as Arc<dyn CustomEventAdapter>)
    })
}

// Register the specific adapter implementation
register_adapter!(
    <CustomEventAdapterRegistration>
    "my::InMemoryEventAdapter",
    make_inmemory_adapter
);
```

### Step 4: Implement Adapter Logic

Create the actual adapter implementations.

<AccordionGroup>
  <Accordion title="In-Memory Adapter" icon="memory-stick">
    Simple in-memory implementation for development and testing.

    ```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    use std::collections::HashMap;
    use tokio::sync::RwLock;

    pub struct InMemoryEventAdapter {
        subscriptions: Arc<RwLock<HashMap<String, HashMap<String, String>>>>,
        engine: Arc<Engine>,
    }

    impl InMemoryEventAdapter {
        pub async fn new(
            _config: Option<Value>,
            engine: Arc<Engine>
        ) -> anyhow::Result<Self> {
            Ok(Self {
                subscriptions: Arc::new(RwLock::new(HashMap::new())),
                engine,
            })
        }
    }

    #[async_trait]
    impl CustomEventAdapter for InMemoryEventAdapter {
        async fn emit(&self, topic: &str, event_data: Value) {
            let subs = self.subscriptions.read().await;
            if let Some(by_id) = subs.get(topic) {
                for function_id in by_id.values() {
                    let _ = self.engine.call(function_id, event_data.clone()).await;
                }
            }
        }

        async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
            let mut subs = self.subscriptions.write().await;
            subs.entry(topic.to_string())
                .or_insert_with(HashMap::new)
                .insert(id.to_string(), function_id.to_string());
        }

        async fn unsubscribe(&self, topic: &str, id: &str) {
            let mut subs = self.subscriptions.write().await;
            if let Some(by_id) = subs.get_mut(topic) {
                by_id.remove(id);
            }
        }
    }
    ```
  </Accordion>

  <Accordion title="Logging Wrapper Adapter" icon="file-text">
    Wrapper adapter that logs all events while delegating to another adapter.

    ```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    pub struct LoggingEventAdapter {
        inner: Arc<dyn CustomEventAdapter>,
    }

    #[async_trait]
    impl CustomEventAdapter for LoggingEventAdapter {
        async fn emit(&self, topic: &str, event_data: Value) {
            tracing::info!(
                topic = %topic,
                event_data = %event_data,
                "Emitting event"
            );
            self.inner.emit(topic, event_data).await;
        }

        async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
            tracing::info!(topic = %topic, "Subscribing to topic");
            self.inner.subscribe(topic, id, function_id).await;
        }

        async fn unsubscribe(&self, topic: &str, id: &str) {
            tracing::info!(topic = %topic, "Unsubscribing from topic");
            self.inner.unsubscribe(topic, id).await;
        }
    }
    ```
  </Accordion>
</AccordionGroup>

### Step 5: Implement the Worker Logic

The worker struct holds the `Engine` reference and the injected `Adapter`.

```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
use serde::Deserialize;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;

#[derive(Debug, Clone, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct CustomEventModuleConfig {
    #[serde(default)]
    pub adapter: Option<AdapterEntry>,
}

#[derive(Clone)]
pub struct CustomEventModule {
    adapter: Arc<dyn CustomEventAdapter>,
    engine: Arc<Engine>,
    _config: CustomEventModuleConfig,
}

#[async_trait]
impl ConfigurableWorker for CustomEventModule {
    type Config = CustomEventModuleConfig;
    type Adapter = dyn CustomEventAdapter;
    type AdapterRegistration = CustomEventAdapterRegistration;
    const DEFAULT_ADAPTER_NAME: &'static str = "my::InMemoryEventAdapter";

    // Define how to access the registry
    async fn registry() -> &'static RwLock<HashMap<String, AdapterFactory<Self::Adapter>>> {
        static REGISTRY: Lazy<RwLock<HashMap<String, AdapterFactory<dyn CustomEventAdapter>>>> =
            Lazy::new(|| RwLock::new(CustomEventModule::build_registry()));

        &REGISTRY
    }

    // Builder method
    fn build(
        engine: Arc<Engine>,
        config: Self::Config,
        adapter: Arc<Self::Adapter>
    ) -> Self {
        Self {
            engine,
            _config: config,
            adapter
        }
    }
}
```

## Registering Functions

Workers expose functionality to the engine (and thus to SDK workers) by registering functions. This is typically done in the `initialize` method or `register_functions`.

### Registration Request Structure

When registering a function, you must provide a `RegisterFunctionRequest`.

| Field             | Type             | Description                                |
| ----------------- | ---------------- | ------------------------------------------ |
| `function_id`     | `String`         | Unique function ID (e.g., "custom::emit")  |
| `description`     | `Option<String>` | Human-readable description of the function |
| `request_format`  | `Option<Value>`  | JSON Schema defining the expected input    |
| `response_format` | `Option<Value>`  | JSON Schema defining the expected output   |

<Note>
  When using the `#[service]` macro with `#[function]` attributes, `request_format` and `response_format` are **auto-generated** as standard JSON Schema from your Rust types (via [`schemars`](https://docs.rs/schemars)). Your input/output types must derive `JsonSchema`. Manual schema specification is only needed for custom worker registration.
</Note>

### Example Registration

```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
use iii::RegisterFunctionRequest;

#[async_trait]
impl CoreWorker for CustomEventModule {
    fn name(&self) -> &str {
        "custom_event"
    }

    async fn initialize(&self) -> anyhow::Result<()> {
        self.engine.register_function(
            RegisterFunctionRequest {
                function_id: "custom::emit".to_string(),
                description: Some("Emit a custom event".to_string()),
                request_format: Some(serde_json::json!({
                    "type": "object",
                    "properties": {
                        "topic": { "type": "string" },
                        "data": { "type": "object" }
                    },
                    "required": ["topic", "data"]
                })),
                response_format: None,
                metadata: None,
            },
            Box::new(self.clone()), // The handler
        );
        Ok(())
    }

    async fn register_functions(&self, _engine: Arc<Engine>) -> anyhow::Result<()> {
        // Additional function registrations can go here
        Ok(())
    }
}
```

## Handling Function Invocations

To handle invocations, the worker (or a specific handler struct) must implement the `FunctionHandler` trait.

```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
use iii::{FunctionHandler, FunctionResult};

#[async_trait]
impl FunctionHandler for CustomEventModule {
    async fn handle(&self, input: Value) -> FunctionResult {
        // 1. Parse Input
        let topic = input.get("topic")
            .and_then(|v| v.as_str())
            .ok_or_else(|| anyhow::anyhow!("Missing 'topic' field"))?;

        let data = input.get("data")
            .cloned()
            .unwrap_or(Value::Null);

        // 2. Execute Logic (using the adapter)
        self.adapter.emit(topic, data).await;

        // 3. Return Result
        FunctionResult::Success(None)
    }
}
```

## Registering Triggers

Workers can also act as sources of events by registering `TriggerType`s. This allows the engine to route external events (like Cron ticks or HTTP requests) to specific functions.

### Trigger Architecture

```mermaid theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
graph TD
    Worker[Core Worker] -->|Registers| TT[TriggerType]
    TT -->|Contains| Registrator[TriggerRegistrator]

    Worker -->|Sends registertrigger| Engine
    Engine -->|Delegates to| Registrator
    Registrator -->|Stores| TriggerDefinition

    ExternalEvent[External Event e.g., Timer/HTTP] --> Worker
    Worker -->|Look up| TriggerDefinition
    Worker -->|Invoke| Engine
    Engine -->|Route to| Worker
```

### Implementation

To support triggers, a worker implements `TriggerRegistrator`.

```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
use iii::{TriggerRegistrator, Trigger, TriggerType};
use std::future::Future;
use std::pin::Pin;

impl TriggerRegistrator for CustomEventModule {
    fn register_trigger(
        &self,
        trigger: Trigger,
    ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + '_>> {
        Box::pin(async move {
            // Extract configuration
            let config = trigger.config;
            let subscribes = config.get("subscribes")
                .and_then(|v| v.as_array())
                .ok_or_else(|| anyhow::anyhow!("Missing 'subscribes' array"))?;

            // Subscribe to each topic
            for topic in subscribes {
                let topic_str = topic.as_str()
                    .ok_or_else(|| anyhow::anyhow!("Invalid topic"))?;

                self.adapter.subscribe(
                    topic_str,
                    &trigger.id,
                    &trigger.function_id
                ).await;
            }

            Ok(())
        })
    }
}
```

Then, register the trigger type during initialization:

```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
async fn initialize(&self) -> anyhow::Result<()> {
    // Register functions
    // ... (function registration code)

    // Register trigger type
    let trigger_type = TriggerType {
        id: "event".to_string(),
        registrator: Box::new(self.clone()),
        description: Some("Event-based trigger".to_string()),
    };

    self.engine.register_trigger_type(trigger_type).await?;

    Ok(())
}
```

## Configuration

Workers are configured via `iii-config.yaml` or JSON passed during initialization. The `ConfigurableWorker` trait maps this configuration to a Rust struct.

### Configuration Struct

```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct CustomEventModuleConfig {
    #[serde(default)]
    pub adapter: Option<AdapterEntry>,
}
```

### Usage in Config File

```yaml theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
workers:
  - name: my::CustomEventModule
    config:
      adapter:
        name: my::LoggingEventAdapter
        config:
          inner_adapter: my::InMemoryEventAdapter
```

**Nested Adapters**: The logging adapter wraps the in-memory adapter, creating a decorator pattern for cross-cutting concerns.

## Complete Example

Here's a complete custom worker implementation:

### examples/custom\_event\_module.rs

```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
use async_trait::async_trait;
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use iii::{
    Engine, CoreWorker, ConfigurableWorker, FunctionHandler,
    FunctionResult, RegisterFunctionRequest, TriggerRegistrator,
    Trigger, TriggerType, AdapterEntry
};

// 1. Define Adapter Trait
#[async_trait]
pub trait CustomEventAdapter: Send + Sync + 'static {
    async fn emit(&self, topic: &str, event_data: Value);
    async fn subscribe(&self, topic: &str, id: &str, function_id: &str);
    async fn unsubscribe(&self, topic: &str, id: &str);
}

// 2. Implement In-Memory Adapter
pub struct InMemoryEventAdapter {
    subscriptions: Arc<RwLock<HashMap<String, HashMap<String, String>>>>,
    engine: Arc<Engine>,
}

impl InMemoryEventAdapter {
    pub async fn new(_config: Option<Value>, engine: Arc<Engine>) -> anyhow::Result<Self> {
        Ok(Self {
            subscriptions: Arc::new(RwLock::new(HashMap::new())),
            engine,
        })
    }
}

#[async_trait]
impl CustomEventAdapter for InMemoryEventAdapter {
    async fn emit(&self, topic: &str, event_data: Value) {
        let subs = self.subscriptions.read().await;
        if let Some(by_id) = subs.get(topic) {
            for function_id in by_id.values() {
                let _ = self.engine.call(function_id, event_data.clone()).await;
            }
        }
    }

    async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
        let mut subs = self.subscriptions.write().await;
        subs.entry(topic.to_string())
            .or_insert_with(HashMap::new)
            .insert(id.to_string(), function_id.to_string());
    }

    async fn unsubscribe(&self, topic: &str, id: &str) {
        let mut subs = self.subscriptions.write().await;
        if let Some(by_id) = subs.get_mut(topic) {
            by_id.remove(id);
        }
    }
}

// 3. Define Worker Configuration
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct CustomEventModuleConfig {
    #[serde(default)]
    pub adapter: Option<AdapterEntry>,
}

// 4. Implement Worker
#[derive(Clone)]
pub struct CustomEventModule {
    adapter: Arc<dyn CustomEventAdapter>,
    engine: Arc<Engine>,
    _config: CustomEventModuleConfig,
}

#[async_trait]
impl CoreWorker for CustomEventModule {
    fn name(&self) -> &str {
        "custom_event"
    }

    async fn initialize(&self) -> anyhow::Result<()> {
        // Register emit function
        self.engine.register_function(
            RegisterFunctionRequest {
                function_id: "custom::emit".to_string(),
                description: Some("Emit a custom event".to_string()),
                request_format: Some(serde_json::json!({
                    "type": "object",
                    "properties": {
                        "topic": { "type": "string" },
                        "data": { "type": "object" }
                    },
                    "required": ["topic", "data"]
                })),
                response_format: None,
                metadata: None,
            },
            Box::new(self.clone()),
        );

        // Register trigger type
        let trigger_type = TriggerType {
            id: "event".to_string(),
            registrator: Box::new(self.clone()),
            description: Some("Event-based trigger".to_string()),
        };
        self.engine.register_trigger_type(trigger_type).await?;

        Ok(())
    }
}

#[async_trait]
impl FunctionHandler for CustomEventModule {
    async fn handle(&self, input: Value) -> FunctionResult {
        let topic = input.get("topic").and_then(|v| v.as_str()).unwrap_or("");
        let data = input.get("data").cloned().unwrap_or(Value::Null);

        self.adapter.emit(topic, data).await;
        FunctionResult::Success(None)
    }
}

impl TriggerRegistrator for CustomEventModule {
    fn register_trigger(
        &self,
        trigger: Trigger,
    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), anyhow::Error>> + Send + '_>> {
        Box::pin(async move {
            let config = trigger.config;
            let subscribes = config.get("subscribes")
                .and_then(|v| v.as_array())
                .ok_or_else(|| anyhow::anyhow!("Missing 'subscribes' array"))?;

            for topic in subscribes {
                let topic_str = topic.as_str()
                    .ok_or_else(|| anyhow::anyhow!("Invalid topic"))?;
                self.adapter.subscribe(topic_str, &trigger.id, &trigger.function_id).await;
            }

            Ok(())
        })
    }
}
```

### iii-config.yaml

```yaml theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
workers:
  - name: my::CustomEventModule
    config:
      adapter:
        name: my::InMemoryEventAdapter
```

## Best Practices

<AccordionGroup>
  <Accordion title="Use Adapter Pattern">
    Always use adapters for external integrations to allow swapping implementations.

    ```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    // Good: Adapter-based design
    pub trait StorageAdapter {
        async fn save(&self, key: &str, value: Value);
    }

    // Avoid: Hard-coded implementation
    pub struct Worker {
        redis: RedisClient,  // Tightly coupled
    }
    ```
  </Accordion>

  <Accordion title="Implement Graceful Shutdown">
    Handle cleanup in the worker's drop implementation or provide shutdown hooks.

    ```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    impl Drop for CustomEventModule {
        fn drop(&mut self) {
            // Clean up resources
            tracing::info!("Shutting down CustomEventModule");
        }
    }
    ```
  </Accordion>

  <Accordion title="Provide JSON Schemas">
    Always define request and response formats for functions to enable validation and documentation.

    ```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    request_format: Some(serde_json::json!({
        "type": "object",
        "properties": {
            "email": { "type": "string", "format": "email" },
            "age": { "type": "number", "minimum": 0 }
        },
        "required": ["email"]
    }))
    ```
  </Accordion>

  <Accordion title="Use Structured Logging">
    Use the `tracing` crate for structured logging with context.

    ```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    tracing::info!(
        topic = %topic,
        subscriber_count = by_id.len(),
        "Emitting event to subscribers"
    );
    ```
  </Accordion>
</AccordionGroup>

## Next Steps

<CardGroup cols={2}>
  <Card title="Core Workers" href="../workers" icon="cubes">
    Explore built-in core workers
  </Card>

  <Card title="Adapters" href="./adapters" icon="gear">
    Learn more about the adapter pattern
  </Card>
</CardGroup>
