Workflow Streaming
Workflows in Mastra have access to a powerful streaming protocol! With seamless integration into tools or agents as a step, you can stream responses directly back to your clients, creating a more interactive and engaging experience.
Usage
To use the new protocol, you can use the streamVNext
method on an workflow. This method will return a custom MatraWorkflowStream. This stream extends a ReadableStream, so all basic stream methods are available.
const run = await myWorkflow.createRunAsync();
const stream = await run.streamVNext({ inputData: { city: 'New York' } });
for await (const chunk of stream) {
console.log(chunk);
}
Each chunk is a JSON object with the following properties:
{
type: string;
runId: string;
from: string;
payload: Record<string, any>;
}
We have a couple of utility functions on the stream to help you with the streaming process.
stream.status
- The status of the workflow run.stream.result
- The result of the workflow run.stream.usage
- The total token usage of the workflow run.
How to use the stream in a tool
Each tool gets a writer
argument, which is a writable stream with a custom write function. This write function is used to write the tool’s response to the stream.
import { createStep } from "@mastra/core/workflows";
import { z } from "zod";
export const weatherInfo = createStep({
id: "weather-info",
inputSchema: z.object({
city: z.string(),
}),
outputSchema: z.object({
conditions: z.string(),
temperature: z.number(),
}),
description: `Fetches the current weather information for a given city`,
execute: async ({ inputData: { city }, writer }) => {
writer.write({
type: "weather-data",
args: {
city
},
status: "pending"
})
// Tool logic here (e.g., API call)
console.log("Using tool to fetch weather information for", city);
writer.write({
type: "weather-data",
args: {
city
},
status: "success",
result: {
temperature: 20,
conditions: "Sunny"
}
})
return { temperature: 20, conditions: "Sunny" }; // Example return
},
});
If you want to use the stream in an agent, you can use the streamVNext
method on the agent and pipe it to the agent’s input stream.
import { createStep } from "@mastra/core/workflows";
import { z } from "zod";
export const weatherInfo = createStep({
id: "weather-info",
inputSchema: z.object({
city: z.string(),
}),
outputSchema: z.object({
text: z.string(),
}),
description: `Fetches the current weather information for a given city`,
execute: async ({ inputData: { city }, writer, mastra }) => {
const agent = mastra.getAgent('weatherAgent')
const stream = await agent.streamVNext(`What is the weather in ${city}?`);
await stream.pipeTo(writer);
return {
text: await stream.text,
}
},
});
Piping the stream to the agent’s input stream will allow us to automatically sum up the usage of the agent so the total usage count can be calculated.