CachingPubSub
CachingPubSub wraps any PubSub implementation and adds event caching and replay. It records every published event per topic in a cache, so a subscriber that connects late or reconnects after a disconnect can replay the events it missed before continuing with live events.
Use it to build resumable streams on top of a transport that does not keep history, such as EventEmitterPubSub. Transports that already persist events, such as RedisStreamsPubSub, do not need this wrapper.
Usage exampleDirect link to Usage example
Wrap an inner pub/sub and provide a server cache to store events.
import { Mastra } from '@mastra/core'
import { CachingPubSub, EventEmitterPubSub } from '@mastra/core/events'
import { InMemoryServerCache } from '@mastra/core/cache'
const cache = new InMemoryServerCache()
const pubsub = new CachingPubSub(new EventEmitterPubSub(), cache)
export const mastra = new Mastra({
pubsub,
})
The withCaching helper returns the same instance and reads better when wrapping inline:
import { withCaching, EventEmitterPubSub } from '@mastra/core/events'
import { InMemoryServerCache } from '@mastra/core/cache'
const pubsub = withCaching(new EventEmitterPubSub(), new InMemoryServerCache())
Constructor parametersDirect link to Constructor parameters
inner:
cache:
options?:
MethodsDirect link to Methods
CachingPubSub implements the PubSub contract. It overrides the replay methods to read cached events. The methods below describe the caching behavior.
publish(topic, event)Direct link to publishtopic-event
Caches the event with a sequential index, then publishes it to the inner pub/sub.
await pubsub.publish('my-topic', {
type: 'example',
data: { value: 1 },
runId: 'run-123',
})
subscribeWithReplay(topic, cb)Direct link to subscribewithreplaytopic-cb
Replays all cached events for the topic, then subscribes to live events.
await pubsub.subscribeWithReplay('my-topic', event => {
console.log(event)
})
subscribeFromOffset(topic, offset, cb)Direct link to subscribefromoffsettopic-offset-cb
Replays cached events starting at offset, then subscribes to live events. Use this when the client knows its last position, to avoid replaying the whole history.
await pubsub.subscribeFromOffset('my-topic', 42, event => {
console.log(event)
})
getHistory(topic, offset?)Direct link to gethistorytopic-offset
Returns cached events for the topic, starting at offset.
const events = await pubsub.getHistory('my-topic', 0)
Returns: Promise<Event[]>
FunctionsDirect link to Functions
withCaching(pubsub, cache, options?)Direct link to withcachingpubsub-cache-options
Convenience wrapper that constructs a CachingPubSub. Accepts the same arguments as the constructor and returns the new instance.
import { withCaching, EventEmitterPubSub } from '@mastra/core/events'
import { InMemoryServerCache } from '@mastra/core/cache'
const pubsub = withCaching(new EventEmitterPubSub(), new InMemoryServerCache(), {
keyPrefix: 'events:',
})
Returns: CachingPubSub