Skip to Content
DocsWorkflowsStreamingnew

Workflow Streaming

Workflows in Mastra have access to a powerful streaming protocol! With seamless integration into tools or agents as a step, you can stream responses directly back to your clients, creating a more interactive and engaging experience.

Usage

To use the new protocol, you can use the streamVNext method on an workflow. This method will return a custom MatraWorkflowStream. This stream extends a ReadableStream, so all basic stream methods are available.

const run = await myWorkflow.createRunAsync(); const stream = await run.streamVNext({ inputData: { city: 'New York' } }); for await (const chunk of stream) { console.log(chunk); }

Each chunk is a JSON object with the following properties:

{ type: string; runId: string; from: string; payload: Record<string, any>; }

We have a couple of utility functions on the stream to help you with the streaming process.

  • stream.status - The status of the workflow run.
  • stream.result - The result of the workflow run.
  • stream.usage - The total token usage of the workflow run.

How to use the stream in a tool

Each tool gets a writer argument, which is a writable stream with a custom write function. This write function is used to write the tool’s response to the stream.

src/mastra/workflows/weather.ts
import { createStep } from "@mastra/core/workflows"; import { z } from "zod"; export const weatherInfo = createStep({ id: "weather-info", inputSchema: z.object({ city: z.string(), }), outputSchema: z.object({ conditions: z.string(), temperature: z.number(), }), description: `Fetches the current weather information for a given city`, execute: async ({ inputData: { city }, writer }) => { writer.write({ type: "weather-data", args: { city }, status: "pending" }) // Tool logic here (e.g., API call) console.log("Using tool to fetch weather information for", city); writer.write({ type: "weather-data", args: { city }, status: "success", result: { temperature: 20, conditions: "Sunny" } }) return { temperature: 20, conditions: "Sunny" }; // Example return }, });

If you want to use the stream in an agent, you can use the streamVNext method on the agent and pipe it to the agent’s input stream.

src/mastra/workflows/weather.ts
import { createStep } from "@mastra/core/workflows"; import { z } from "zod"; export const weatherInfo = createStep({ id: "weather-info", inputSchema: z.object({ city: z.string(), }), outputSchema: z.object({ text: z.string(), }), description: `Fetches the current weather information for a given city`, execute: async ({ inputData: { city }, writer, mastra }) => { const agent = mastra.getAgent('weatherAgent') const stream = await agent.streamVNext(`What is the weather in ${city}?`); await stream.pipeTo(writer); return { text: await stream.text, } }, });

Piping the stream to the agent’s input stream will allow us to automatically sum up the usage of the agent so the total usage count can be calculated.