PubSub
PubSub is the abstract base class for Mastra's event system. It defines the contract that every pub/sub backend implements, so the rest of Mastra can publish and subscribe to events without knowing which transport is in use.
Mastra uses pub/sub internally for workflow event processing, streaming, and cross-component communication. Most applications use the default EventEmitterPubSub and never construct a PubSub directly. Implement this class only when you need a custom transport.
For built-in implementations, see EventEmitterPubSub, UnixSocketPubSub, CachingPubSub, RedisStreamsPubSub, and GoogleCloudPubSub.
Usage exampleDirect link to Usage example
Extend PubSub and implement the four abstract methods to add a custom backend.
import { PubSub } from '@mastra/core/events'
import type { Event, EventCallback, SubscribeOptions } from '@mastra/core/events'
export class CustomPubSub extends PubSub {
async publish(topic: string, event: Omit<Event, 'id' | 'createdAt'>): Promise<void> {
// Deliver the event to subscribers of `topic`.
}
async subscribe(topic: string, cb: EventCallback, options?: SubscribeOptions): Promise<void> {
// Register `cb` to receive events published to `topic`.
}
async unsubscribe(topic: string, cb: EventCallback): Promise<void> {
// Remove a previously registered callback.
}
async flush(): Promise<void> {
// Wait for any in-flight deliveries to settle.
}
}
Pass the instance to the Mastra constructor:
import { Mastra } from '@mastra/core'
import { CustomPubSub } from './pubsub'
export const mastra = new Mastra({
pubsub: new CustomPubSub(),
})
Delivery modesDirect link to Delivery modes
A PubSub declares which delivery modes it supports through the supportedModes property. Mastra reads this to decide whether to run a long-lived worker that pulls events.
| Mode | Description |
|---|---|
pull | Consumers actively read from the broker, for example Redis Streams XREADGROUP. Mastra runs an orchestration worker to read. |
push | Events arrive without the consumer asking, either in-process or through an HTTP endpoint. No read loop is required. |
The default is ['pull'] so that custom implementations keep today's behavior unless they opt in to push delivery.
MethodsDirect link to Methods
Core methodsDirect link to Core methods
publish(topic, event)Direct link to publishtopic-event
Publishes an event to a topic. The id and createdAt fields are assigned by the implementation.
await pubsub.publish('my-topic', {
type: 'example',
data: { value: 1 },
runId: 'run-123',
})
subscribe(topic, cb, options?)Direct link to subscribetopic-cb-options
Registers a callback to receive events published to a topic. When options.group is set, subscribers in the same group compete for messages and each event is delivered to one member. Without a group, every subscriber receives every event.
await pubsub.subscribe('my-topic', (event, ack, nack) => {
console.log(event)
})
unsubscribe(topic, cb)Direct link to unsubscribetopic-cb
Removes a previously registered callback from a topic.
await pubsub.unsubscribe('my-topic', callback)
flush()Direct link to flush
Waits for any in-flight deliveries to settle. Call this before shutdown to avoid dropping events.
await pubsub.flush()
Replay methodsDirect link to Replay methods
These methods support resuming a stream after a disconnect. The default implementations fall back to a regular subscribe, so backends without history support behave as live-only. CachingPubSub overrides them to replay cached events.
getHistory(topic, offset?)Direct link to gethistorytopic-offset
Returns cached events for a topic, starting at offset. Returns an empty array when the backend has no history.
const events = await pubsub.getHistory('my-topic', 0)
Returns: Promise<Event[]>
subscribeWithReplay(topic, cb)Direct link to subscribewithreplaytopic-cb
Replays cached events, then subscribes to live events.
await pubsub.subscribeWithReplay('my-topic', event => {
console.log(event)
})
subscribeFromOffset(topic, offset, cb)Direct link to subscribefromoffsettopic-offset-cb
Replays cached events starting at a known position, then subscribes to live events. This is more efficient than a full replay when the client knows its last position.
await pubsub.subscribeFromOffset('my-topic', 42, event => {
console.log(event)
})
PropertiesDirect link to Properties
supportedModes:
TypesDirect link to Types
EventDirect link to event
type:
id:
data:
runId:
createdAt:
index?:
deliveryAttempt?:
SubscribeOptionsDirect link to subscribeoptions
group?:
EventCallbackDirect link to eventcallback
The callback signature for subscribers: (event: Event, ack?: () => Promise<void>, nack?: () => Promise<void>) => void.