Skip to main content

RedisStreamsPubSub

RedisStreamsPubSub is a PubSub implementation backed by Redis Streams. It delivers events across processes and hosts, with persistence, consumer groups, and redelivery on failure.

Use it for distributed deployments where several services share an event stream. For single-process delivery, use EventEmitterPubSub. For Google Cloud, use GoogleCloudPubSub.

Each topic maps to a Redis stream key. Subscriptions with a group use a Redis consumer group, so members share work round-robin. Subscriptions without a group create a private consumer group, so every subscriber receives every event.

RedisStreamsPubSub is a pull transport: consumers read events with XREADGROUP, so Mastra runs an orchestration worker to read on its behalf.

Installation
Direct link to Installation

npm install @mastra/redis-streams

Usage example
Direct link to Usage example

Provide a Redis connection URL.

src/mastra/index.ts
import { Mastra } from '@mastra/core'
import { RedisStreamsPubSub } from '@mastra/redis-streams'

export const mastra = new Mastra({
pubsub: new RedisStreamsPubSub({
url: 'redis://localhost:6379',
}),
})

Constructor parameters
Direct link to Constructor parameters

url?:

string
= redis://localhost:6379
Redis connection URL. Falls back to `redisOptions.url`.

keyPrefix?:

string
= mastra:topic
Prefix for stream keys. Each topic maps to `<keyPrefix>:<topic>`.

blockMs?:

number
= 1000
How long, in milliseconds, each read blocks while waiting for new events.

redisOptions?:

RedisClientOptions
Options passed to the underlying `redis` client for advanced configuration.

maxStreamLength?:

number
= 10000
Approximate maximum number of entries kept per stream. Set to 0 to disable trimming.

reclaimIntervalMs?:

number
= 30000
How often, in milliseconds, a subscription reclaims events that an earlier consumer read but never acknowledged. Set to 0 to disable.

reclaimIdleMs?:

number
= 60000
Minimum idle time, in milliseconds, before a pending event is eligible for reclaim. Keep this well above typical processing time to avoid double delivery.

maxDeliveryAttempts?:

number
= 5
Maximum times an event is redelivered through `nack` before it is dropped. Pass `Infinity` to disable the cap.

logger?:

{ debug?: Function; warn?: Function }
Optional logger for diagnostics. When omitted, suppressed errors are silent.

Properties
Direct link to Properties

supportedModes:

ReadonlyArray<"pull" | "push">
Returns `["pull"]`.

Methods
Direct link to Methods

RedisStreamsPubSub implements the PubSub contract. The methods below have behavior specific to this implementation.

subscribe(topic, cb, options?)
Direct link to subscribetopic-cb-options

Subscribes to a topic. With options.group, members of the group share events through a Redis consumer group. Without a group, the subscriber receives every event through a private consumer group.

await pubsub.subscribe('workflow.events', (event, ack, nack) => {
console.log(event)
})

flush()
Direct link to flush

Waits for in-flight publishes to complete.

await pubsub.flush()

close()
Direct link to close

Closes the Redis connections and stops all subscriptions. Call this during graceful shutdown.

await pubsub.close()

Redelivery and reclaim
Direct link to Redelivery and reclaim

When a subscriber calls nack, the event is republished with an incremented deliveryAttempt and the original is acknowledged. Once an event reaches maxDeliveryAttempts, it is dropped instead of redelivered. Separately, each subscription periodically reclaims events that an earlier consumer in the group read but never acknowledged, controlled by reclaimIntervalMs and reclaimIdleMs.