Streams
Build durable streams for real-time data subscriptions.
Durable streams for real-time data subscriptions.
modules::streams::StreamModuleSample Configuration
- class: modules::streams::StreamModule
config:
port: ${STREAMS_PORT:31112}
host: 0.0.0.0
auth_function: motia.streams.authenticate
adapter:
class: modules::streams::adapters::RedisAdapter
config:
redis_url: ${REDIS_URL:redis://localhost:6379}Configuration
The authentication function to use. It's a path to a function that will be used to authenticate the client. You can register the function using the iii SDK and then use the path to the function here.
The adapter to use. It's the adapter that will be used to store the streams. You can register the adapter using the iii SDK and then use the path to the adapter here.
Adapters
modules::streams::adapters::RedisAdapter
Uses Redis as the backend for the streams. This will use the key/value store to store the streams data and also leverage Redis Pub/Sub to publish and subscribe to the streams.
class: modules::streams::adapters::RedisAdapter
config:
redis_url: ${REDIS_URL:redis://localhost:6379}Configuration
modules::streams::adapters::KvStore
Uses Redis as the backend for the streams. This will use the key/value store to store the streams data and also leverage Redis Pub/Sub to publish and subscribe to the streams.
class: modules::streams::adapters::KvStore
config:
store_method: file_based
file_path: ./data/streams_store.dbConfiguration
This will discribe what store_method you want, the options are: - in_memory - file_based
if you choose file_base method of storage, we can provide the file with path where you want to save and read it.
Functions
Retrieves a group from the stream. This function will return all the items in the group.
Authentication
It's possible to implement a function to handle authentication.
- Define a function to handle the authentication. It received one single argument with the request data.
function onAuth(input: StreamAuthInput) {
console.log('Authenticated request', input)
return {
context: { name: 'John Doe' },
}
}
bridge.registerFunction({
function_path: 'onAuth',
handler: onAuth,
})- Make sure you add the function to the configuration file.
- class: modules::streams::StreamModule
config:
auth_function: onAuth # Same name as the function you registered- Now whenever someone opens a websocket connection, the function
onAuthwill be called with the request data.
Trigger Types
Two new trigger types are added to the iii SDK: streams:join and streams:leave.
Both have the same parameters when invoked.
Sample Code
type JoinInput<TContext = object> = {
subscription_id: string
stream_name: string
group_id: string
id: string
context: TContext
}
function onJoin(input: JoinInput<any>) {
console.log('Joined stream', input)
}
bridge.registerFunction({
function_path: 'onJoin',
handler: onJoin,
})
bridge.registerTrigger({
trigger_type: 'streams:join',
function_path: 'onJoin',
})