iii

Streams

Build durable streams for real-time data subscriptions.

Durable streams for real-time data subscriptions.

modules::streams::StreamModule

Sample Configuration

- class: modules::streams::StreamModule
  config:
    port: ${STREAMS_PORT:31112}
    host: 0.0.0.0
    auth_function: motia.streams.authenticate
    adapter:
      class: modules::streams::adapters::RedisAdapter
      config:
        redis_url: ${REDIS_URL:redis://localhost:6379}

Configuration

port
number

The port to listen on. Defaults to 31112.

host
string

The host to listen on. Defaults to 0.0.0.0.

auth_function
string

The authentication function to use. It's a path to a function that will be used to authenticate the client. You can register the function using the iii SDK and then use the path to the function here.

adapter
Adapter

The adapter to use. It's the adapter that will be used to store the streams. You can register the adapter using the iii SDK and then use the path to the adapter here.

Adapters

modules::streams::adapters::RedisAdapter

Uses Redis as the backend for the streams. This will use the key/value store to store the streams data and also leverage Redis Pub/Sub to publish and subscribe to the streams.

class: modules::streams::adapters::RedisAdapter
config:
  redis_url: ${REDIS_URL:redis://localhost:6379}

Configuration

redis_url
string

The URL of the Redis instance to use.

modules::streams::adapters::KvStore

Uses Redis as the backend for the streams. This will use the key/value store to store the streams data and also leverage Redis Pub/Sub to publish and subscribe to the streams.

class: modules::streams::adapters::KvStore
config:
  store_method: file_based
  file_path: ./data/streams_store.db

Configuration

store_method
string

This will discribe what store_method you want, the options are: - in_memory - file_based

file_path
string

if you choose file_base method of storage, we can provide the file with path where you want to save and read it.

Functions

streams.set
function

Sets a value in the stream.

streams.get
function

Gets a value from the stream.

streams.delete
function

Deletes a value from the stream.

streams.getGroup
function

Retrieves a group from the stream. This function will return all the items in the group.

Authentication

It's possible to implement a function to handle authentication.

  1. Define a function to handle the authentication. It received one single argument with the request data.
function onAuth(input: StreamAuthInput) {
  console.log('Authenticated request', input)

  return {
    context: { name: 'John Doe' },
  }
}

bridge.registerFunction({
  function_path: 'onAuth',
  handler: onAuth,
})
  1. Make sure you add the function to the configuration file.
- class: modules::streams::StreamModule
  config:
    auth_function: onAuth # Same name as the function you registered
  1. Now whenever someone opens a websocket connection, the function onAuth will be called with the request data.

Trigger Types

Two new trigger types are added to the iii SDK: streams:join and streams:leave. Both have the same parameters when invoked.

subscription_id
string
required

The subscription ID, mostly used for uniqueness or logging purposes.

stream_name
string
required

The stream name of the subscription

group_id
string
required

The Group ID of the subscription.

id
string
required

The item ID of the subscription.

context
object

The context generated by the authentication layer.

Sample Code

type JoinInput<TContext = object> = {
  subscription_id: string
  stream_name: string
  group_id: string
  id: string
  context: TContext
}

function onJoin(input: JoinInput<any>) {
  console.log('Joined stream', input)
}

bridge.registerFunction({
  function_path: 'onJoin',
  handler: onJoin,
})

bridge.registerTrigger({
  trigger_type: 'streams:join',
  function_path: 'onJoin',
})

On this page