EventEmitterPubSub
EventEmitterPubSub is the default PubSub implementation. It delivers events in-process using a Node.js EventEmitter, so it works without any external service.
Use it for single-process applications. For delivery across processes on one host, see UnixSocketPubSub. For distributed delivery, see RedisStreamsPubSub or GoogleCloudPubSub.
Because it's in-process, events aren't persisted and aren't shared with other processes. Wrap it in CachingPubSub when you need replay for resumable streams.
Usage exampleDirect link to Usage example
EventEmitterPubSub is used automatically when you don't configure a pubsub option, so most applications never construct it directly. Create one explicitly only when you want to configure or share it.
import { Mastra } from '@mastra/core'
import { EventEmitterPubSub } from '@mastra/core/events'
export const mastra = new Mastra({
pubsub: new EventEmitterPubSub(),
})
To share an emitter with other parts of your application, pass an existing EventEmitter:
import EventEmitter from 'node:events'
import { EventEmitterPubSub } from '@mastra/core/events'
const emitter = new EventEmitter()
const pubsub = new EventEmitterPubSub(emitter)
To surface batched-delivery errors, pass a logger:
import { EventEmitterPubSub } from '@mastra/core/events'
const pubsub = new EventEmitterPubSub(undefined, { logger })
Constructor parametersDirect link to Constructor parameters
existingEmitter?:
options?:
PropertiesDirect link to Properties
supportedModes:
supportsNativeBatching:
MethodsDirect link to Methods
EventEmitterPubSub implements the PubSub contract. The methods below have behavior specific to this implementation.
subscribe(topic, cb, options?)Direct link to subscribetopic-cb-options
Registers a callback for a topic. Without options.group, every subscriber receives every event. With a group, events are distributed round-robin across members of that group.
Pass options.batch to opt in to batched delivery. See Batching below.
await pubsub.subscribe('workflow.events', (event, ack, nack) => {
console.log(event)
})
flush()Direct link to flush
Waits for any pending redeliveries from nack to fire before resolving.
await pubsub.flush()
close()Direct link to close
Removes all listeners and cancels pending redeliveries. Call this during graceful shutdown.
await pubsub.close()
RedeliveryDirect link to Redelivery
When a grouped subscriber calls nack, the event is redelivered to the group after a short delay, and its deliveryAttempt count increases. Calling ack clears the tracking for that event. Fan-out subscribers receive no-op ack and nack functions, since each event reaches every subscriber once.
BatchingDirect link to Batching
EventEmitterPubSub honors options.batch natively. When a subscriber opts in, events are held in a per-subscriber in-memory buffer and delivered as consecutive callback invocations once a flush condition is met. Both fan-out and group subscribers can batch. See SubscribeBatchOptions for the full policy.
await pubsub.subscribe(
'workflow.events',
event => {
console.log(event)
},
{
batch: {
maxSize: 10, // flush once 10 events have queued
maxWaitMs: 500, // ...or after 500ms, whichever comes first
},
},
)
The buffer is in-memory and per-process, so batched state isn't persisted and doesn't survive a restart. A flush triggered by maxWaitMs is best-effort: if a step such as a throwing coalesce fails, the error is surfaced through the configured logger rather than thrown. flush() drains every batched subscriber buffer before resolving.