RedisStreamsPubSub
RedisStreamsPubSub is a PubSub implementation backed by Redis Streams. It delivers events across processes and hosts, with persistence, consumer groups, and redelivery on failure.
Use it for distributed deployments where several services share an event stream. For single-process delivery, use EventEmitterPubSub. For Google Cloud, use GoogleCloudPubSub.
Each topic maps to a Redis stream key. Subscriptions with a group use a Redis consumer group, so members share work round-robin. Subscriptions without a group create a private consumer group, so every subscriber receives every event.
RedisStreamsPubSub is a pull transport: consumers read events with XREADGROUP, so Mastra runs an orchestration worker to read on its behalf.
InstallationDirect link to Installation
- npm
- pnpm
- Yarn
- Bun
npm install @mastra/redis-streams
pnpm add @mastra/redis-streams
yarn add @mastra/redis-streams
bun add @mastra/redis-streams
Usage exampleDirect link to Usage example
Provide a Redis connection URL.
import { Mastra } from '@mastra/core'
import { RedisStreamsPubSub } from '@mastra/redis-streams'
export const mastra = new Mastra({
pubsub: new RedisStreamsPubSub({
url: 'redis://localhost:6379',
}),
})
Constructor parametersDirect link to Constructor parameters
url?:
keyPrefix?:
blockMs?:
redisOptions?:
maxStreamLength?:
reclaimIntervalMs?:
reclaimIdleMs?:
maxDeliveryAttempts?:
logger?:
PropertiesDirect link to Properties
supportedModes:
MethodsDirect link to Methods
RedisStreamsPubSub implements the PubSub contract. The methods below have behavior specific to this implementation.
subscribe(topic, cb, options?)Direct link to subscribetopic-cb-options
Subscribes to a topic. With options.group, members of the group share events through a Redis consumer group. Without a group, the subscriber receives every event through a private consumer group.
await pubsub.subscribe('workflow.events', (event, ack, nack) => {
console.log(event)
})
flush()Direct link to flush
Waits for in-flight publishes to complete.
await pubsub.flush()
close()Direct link to close
Closes the Redis connections and stops all subscriptions. Call this during graceful shutdown.
await pubsub.close()
Redelivery and reclaimDirect link to Redelivery and reclaim
When a subscriber calls nack, the event is republished with an incremented deliveryAttempt and the original is acknowledged. Once an event reaches maxDeliveryAttempts, it is dropped instead of redelivered. Separately, each subscription periodically reclaims events that an earlier consumer in the group read but never acknowledged, controlled by reclaimIntervalMs and reclaimIdleMs.