Workflow streaming
Workflow streaming in Mastra enables workflows to send incremental results while they execute, rather than waiting until completion. This allows you to surface partial progress, intermediate states, or progressive data directly to users or upstream agents and workflows.
Streams can be written to in two main ways:
- From within a workflow step: every workflow step receives a
writer
argument, which is a writable stream you can use to push updates as execution progresses. - From an agent stream: you can also pipe an agent’s
streamVNext
output directly into a workflow step’s writer, making it easy to chain agent responses into workflow results without extra glue code.
By combining writable workflow streams with agent streaming, you gain fine-grained control over how intermediate results flow through your system and into the user experience.
Using the writer
argument
The writer
argument is passed to a workflow step’s execute
function and can be used to emit custom events, data, or values into the active stream. This enables workflow steps to provide intermediate results or status updates while execution is still in progress.
You must await
the call to writer.write(...)
or else you will lock the stream and get a WritableStream is locked
error.
import { createStep } from "@mastra/core/workflows";
export const testStep = createStep({
// ...
execute: async ({ inputData, writer }) => {
const { value } = inputData;
await writer?.write({
type: "custom-event",
status: "pending"
});
const response = await fetch(...);
await writer?.write({
type: "custom-event",
status: "success"
});
return {
value: ""
};
},
});
Inspecting workflow stream payloads
Events written to the stream are included in the emitted chunks. These chunks can be inspected to access any custom fields, such as event types, intermediate values, or step-specific data.
const testWorkflow = mastra.getWorkflow("testWorkflow");
const run = await testWorkflow.createRunAsync();
const stream = await run.streamVNext({
inputData: {
value: "initial data"
}
});
for await (const chunk of stream) {
console.log(chunk);
}
Workflow using an agent
Pipe an agent’s textStream
to the workflow step’s writer
. This streams partial output, and Mastra automatically aggregates the agent’s usage into the workflow run.
import { createStep } from "@mastra/core/workflows";
import { z } from "zod";
export const testStep = createStep({
// ...
execute: async ({ inputData, mastra, writer }) => {
const { city } = inputData
const testAgent = mastra?.getAgent("testAgent");
const stream = await testAgent?.streamVNext(`What is the weather in ${city}$?`);
await stream!.textStream.pipeTo(writer!);
return {
value: await stream!.text,
};
},
});