Publish and Subscribe with Mastra's Built-In Event System

Send or receive events with Redis Streams and Google Cloud Pub/Sub.

Paul ScanlonPaul Scanlon·

Jun 25, 2026

·

3 min read

Mastra apps can now publish and subscribe through a built-in event system. You can listen for events like workflow step completion, pause, or completion. Clients that disconnect can replay what they missed.

Non-Mastra apps can also publish their own events through the system. That means you can subscribe external systems to Mastra events, or subscribe Mastra agents and workflows to external events.

The default implementation is single-machine via electing one process as a broker with other processes connecting as clients (see UnixSocketPubSub). But we also support distributed pub/sub through Redis Streams or Google Cloud Pub/Sub (other services coming soon, or look at the reference implementations and Claude Code your own).

Because Mastra events operate over common pub/sub transports, you can enable interoperable communications with a wide range of external systems or processes. For example, having an agent summarize a user's browsing behavior before sign-up and sending a payload to your legacy CRM — or vice versa.

Get started

Install a transport adapter. This example uses Upstash:

GNU BashTerminal
npm install @mastra/redis-streams
note
Requires @mastra/core@1.34.0 or later, added in PR #16309.

Configure pubsub on your Mastra instance with RedisStreamsPubSub:

TypeScriptsrc/mastra/index.ts
import { Mastra } from "@mastra/core";
import { RedisStreamsPubSub } from "@mastra/redis-streams";
 
export const mastra = new Mastra({
  //...
  pubsub: new RedisStreamsPubSub({
    url: process.env.REDIS_URL!
  })
});

To publish events, call .pubsub.publish() with a topic id and an event object containing a type, runId, and any custom payload:

export const publishSignup = async (signupFormData: SignupFormData, trackedEvents: TrackedEvents[]) => {
  const agent = mastra.getAgent("leadAnalystAgent");
  const { summary, score } = await agent.generate("...", {});
 
  await mastra.pubsub.publish("acme.user.signup", {
    type: "signup.created",
    runId: signupFormData.id,
    data: {
      ...signupFormData,
      trackedEvents,
      summary,
      score
    }
  });
};

To subscribe to events, call .pubsub.subscribe() with a topic id and a handler to receive them:

await mastra.pubsub.subscribe("acme.crm.actions", async (event) => {
  console.log(event);
});

For more information and full configuration options, see:

Appendix

The main current event families are:

  • Workflow orchestration events on workflows: workflow.start, workflow.resume, workflow.cancel, workflow.step.run, workflow.step.end, workflow.end, workflow.fail, workflow.suspend
  • Workflow watch events on workflow.events.v2.$: the PubSub event type is watch, and event.data.type can be: workflow-start, workflow-finish, workflow-canceled, workflow-paused, workflow-step-start, workflow-step-finish, workflow-step-suspended, workflow-step-waiting, workflow-step-output, workflow-step-progress, workflow-step-result
  • Background task dispatch/result events: task.dispatch, task.resume, task.cancel, task.running, task.completed, task.failed, task.cancelled, task.output, task.suspended, task.resumed
  • Durable agent stream events on agent.stream.$: chunk, step-start, step-finish, finish, error, suspended
  • Agent thread stream runtime events: run-registered, stream-part, run-completed, run-suspended, run-aborted, run-failed, signal-enqueued
Share:
Paul Scanlon
Paul ScanlonTechnical Product Marketing Manager

Paul Scanlon sits between Developer Education and Product Marketing at Mastra. Previously, he was a Technical Product Marketing Manager at Neon and worked in Developer Relations at Gatsby, where he created educational content and developer experiences.

All articles by Paul Scanlon