Skip to main content

PubSub

PubSub is the abstract base class for Mastra's event system. It defines the contract that every pub/sub backend implements, so the rest of Mastra can publish and subscribe to events without knowing which transport is in use.

Mastra uses pub/sub internally for workflow event processing, streaming, and cross-component communication. Most applications use the default EventEmitterPubSub and never construct a PubSub directly. Implement this class only when you need a custom transport.

For built-in implementations, see EventEmitterPubSub, UnixSocketPubSub, CachingPubSub, RedisStreamsPubSub, and GoogleCloudPubSub.

Usage example
Direct link to Usage example

Extend PubSub and implement the four abstract methods to add a custom backend.

src/mastra/pubsub.ts
import { PubSub } from '@mastra/core/events'
import type { Event, EventCallback, SubscribeOptions } from '@mastra/core/events'

export class CustomPubSub extends PubSub {
async publish(topic: string, event: Omit<Event, 'id' | 'createdAt'>): Promise<void> {
// Deliver the event to subscribers of `topic`.
}

async subscribe(topic: string, cb: EventCallback, options?: SubscribeOptions): Promise<void> {
// Register `cb` to receive events published to `topic`.
}

async unsubscribe(topic: string, cb: EventCallback): Promise<void> {
// Remove a previously registered callback.
}

async flush(): Promise<void> {
// Wait for any in-flight deliveries to settle.
}
}

Pass the instance to the Mastra constructor:

src/mastra/index.ts
import { Mastra } from '@mastra/core'
import { CustomPubSub } from './pubsub'

export const mastra = new Mastra({
pubsub: new CustomPubSub(),
})

Delivery modes
Direct link to Delivery modes

A PubSub declares which delivery modes it supports through the supportedModes property. Mastra reads this to decide whether to run a long-lived worker that pulls events.

ModeDescription
pullConsumers actively read from the broker, for example Redis Streams XREADGROUP. Mastra runs an orchestration worker to read.
pushEvents arrive without the consumer asking, either in-process or through an HTTP endpoint. No read loop is required.

The default is ['pull'] so that custom implementations keep today's behavior unless they opt in to push delivery.

Methods
Direct link to Methods

Core methods
Direct link to Core methods

publish(topic, event)
Direct link to publishtopic-event

Publishes an event to a topic. The id and createdAt fields are assigned by the implementation.

await pubsub.publish('my-topic', {
type: 'example',
data: { value: 1 },
runId: 'run-123',
})

subscribe(topic, cb, options?)
Direct link to subscribetopic-cb-options

Registers a callback to receive events published to a topic. When options.group is set, subscribers in the same group compete for messages and each event is delivered to one member. Without a group, every subscriber receives every event.

await pubsub.subscribe('my-topic', (event, ack, nack) => {
console.log(event)
})

unsubscribe(topic, cb)
Direct link to unsubscribetopic-cb

Removes a previously registered callback from a topic.

await pubsub.unsubscribe('my-topic', callback)

flush()
Direct link to flush

Waits for any in-flight deliveries to settle. Call this before shutdown to avoid dropping events.

await pubsub.flush()

Replay methods
Direct link to Replay methods

These methods support resuming a stream after a disconnect. The default implementations fall back to a regular subscribe, so backends without history support behave as live-only. CachingPubSub overrides them to replay cached events.

getHistory(topic, offset?)
Direct link to gethistorytopic-offset

Returns cached events for a topic, starting at offset. Returns an empty array when the backend has no history.

const events = await pubsub.getHistory('my-topic', 0)

Returns: Promise<Event[]>

subscribeWithReplay(topic, cb)
Direct link to subscribewithreplaytopic-cb

Replays cached events, then subscribes to live events.

await pubsub.subscribeWithReplay('my-topic', event => {
console.log(event)
})

subscribeFromOffset(topic, offset, cb)
Direct link to subscribefromoffsettopic-offset-cb

Replays cached events starting at a known position, then subscribes to live events. This is more efficient than a full replay when the client knows its last position.

await pubsub.subscribeFromOffset('my-topic', 42, event => {
console.log(event)
})

Properties
Direct link to Properties

supportedModes:

ReadonlyArray<"pull" | "push">
Delivery modes the implementation supports. Defaults to `["pull"]`.

Types
Direct link to Types

Event
Direct link to event

type:

string
Event type identifier.

id:

string
Unique event ID, assigned by the implementation on publish.

data:

any
Event payload.

runId:

string
Run the event belongs to.

createdAt:

Date
Timestamp assigned by the implementation on publish.

index?:

number
Sequential position used to resume from a specific offset.

deliveryAttempt?:

number
Number of times the event has been delivered. Starts at 1. Defaults to 1 when the backend does not track redelivery.

SubscribeOptions
Direct link to subscribeoptions

group?:

string
When set, subscribers with the same group compete for messages and each event is delivered to one member. When omitted, every subscriber receives every event.

EventCallback
Direct link to eventcallback

The callback signature for subscribers: (event: Event, ack?: () => Promise<void>, nack?: () => Promise<void>) => void.

event:

Event
The delivered event.

ack?:

() => Promise<void>
Acknowledge successful processing. The event is removed from the queue.

nack?:

() => Promise<void>
Negative acknowledge. The event is requeued for redelivery after a delay.