As part of 0.11.1 (July 22), we have support for nested streaming in Mastra.
Previously, if you have an agent as part of a tool, you would lose the nested stream. The same thing was true of streaming within a workflow step. Also, if you have a long tool step, you weren't able to stream intermediate progress.
But now, with nested streaming support, you can do all those things. Nested agents within tools and workflow steps will stream properly. In addition, we've unified the messaging interface across primitives (agents, workflows, etc), and added in token count metadata to help users add cost visibility.
The new message format
In order to ship this, we added an AI SDK-compatible messaging protocol that works across all Mastra primitives.
For every step undertaken in the process, you'll receive a specific message formatted as follows:
1type Message {
2 type: string;
3 runId: string;
4 from: string;
5 payload: Record<string, any>;
6}
Note that streams are an instance of Node.js ReadableStream, so you can pipe them, tee them, or do anything you'd normally do with streams. Right now, we're exposing this functionality as part of a streamVNext
API, so you have to explicitly opt-in to it, but we plan to merge it into the main stream
function in a couple of weeks.
Agents
You can stream an agent using our new streamVNext
function. This feature enables real-time tracking of every action the agent performs, providing you with a detailed, step-by-step account of its operations. You can also integrate custom messaging within various tools or incorporate other agents into the process. After the streaming session, you'll receive the total token usage across all involved primitives.
1const result = agent.streamVNext([
2 "What are some great activities to do in Tokyo?",
3]);
4
5for await (const chunk of result) {
6 console.log({ chunk });
7}
Here are the different message types you'll receive:
1 {
2 "type": "start",
3 "from": "AGENT",
4 "payload": {}
5 },
6 {
7 "type": "step-start",
8 "from": "AGENT",
9 "payload": {
10 "messageId": string,
11 "request": Request, // the raw request
12 "warnings": []
13 }
14 },
15 {
16 "type": "tool-call",
17 "from": "AGENT",
18 "payload": {
19 "toolCallId": string,
20 "args": Record<string, unknown>, // the input args
21 "toolName": string // name of the tool
22 }
23 },
24 {
25 "type": "tool-result",
26 "from": "AGENT",
27 "payload": {
28 "toolCallId": string,
29 "toolName": string,
30 "result": Record<string, unknown> // the output args
31 }
32 },
33 {
34 "type": "step-finish",
35 "from": "AGENT",
36 "payload": {
37 "reason": string,
38 "usage": {
39 "promptTokens": number,
40 "completionTokens": number,
41 "totalTokens": number
42 },
43 "response": Response, // raw response
44 "messageId": string,
45 "providerMetadata": {
46 "openai": {
47 "reasoningTokens": number,
48 "acceptedPredictionTokens": number,
49 "rejectedPredictionTokens": number,
50 "cachedPromptTokens": number
51 }
52 }
53 }
54 },
55 {
56 "type": "finish",
57 "from": "AGENT",
58 "payload": {
59 "usage": {
60 "promptTokens": number,
61 "completionTokens": number,
62 "totalTokens": number
63 },
64 "providerMetadata": {
65 "openai": {
66 "reasoningTokens": number,
67 "acceptedPredictionTokens": number,
68 "rejectedPredictionTokens": number,
69 "cachedPromptTokens": number
70 }
71 },
72 "totalUsage": {
73 "promptTokens": number,
74 "completionTokens": number,
75 "totalTokens": number
76 }
77 }
78 }
You can also write custom messages within a tool:
1export const planActivities = createTool({
2 execute: async ({ mastra, context, writer }) => {
3 const agent = mastra!.getAgent("weatherAgent");
4 const prompt = "some prompt";
5 const response = agent.streamVNext([
6 {
7 role: "user",
8 content: prompt,
9 },
10 ]);
11
12 await response.pipeTo(writer);
13
14 // or simply call writer.write({})
15
16 return {
17 activities: await response.text,
18 };
19 },
20});
This will pipe the agent output into the stream:
1{
2 type: 'tool-output',
3 runId: 'string',
4 from: 'USER',
5 payload: {
6 output: // nested output of the agent,
7 toolCallId: string,
8 toolName: string
9 }
10}
To get the total usage at the end, simply call await stream.usage
:
console.log("total usage", await stream.usage);
Workflows
You can stream a workflow using the same streamVNext
function. This feature enables real-time tracking of every step the workflow performs, providing you with a detailed, step-by-step account of its operations. You can also integrate custom messaging within various steps or incorporate agents into the process. After the streaming session, you'll receive the total token usage across all involved primitives.
1const result = run.streamVNext({
2 inputData: { location: "New York" },
3});
4
5for await (const chunk of result) {
6 console.log({ chunk });
7}
Here are the different message types for workflows:
1{
2 "type": "start",
3 "from": "WORKFLOW",
4 "payload": {}
5 },
6 {
7 "type": "step-start",
8 "from": "WORKFLOW",
9 "payload": {
10 "stepName": string,
11 "args": Record<string, unknown>,
12 "stepCallId": string,
13 "startedAt": number,
14 "status": "running"
15 }
16 },
17 {
18 "type": "step-result",
19 "runId": string,
20 "from": "WORKFLOW",
21 "payload": {
22 "stepName": string,
23 "result": Record<string, unknown>,
24 "stepCallId": string,
25 "status": "success",
26 "endedAt": number
27 }
28 },
29 {
30 "type": "finish",
31 "from": "WORKFLOW",
32 "payload": {
33 "totalUsage": {
34 "promptTokens": number,
35 "completionTokens": number,
36 "totalTokens": number
37 }
38 }
39 }
You can also write custom messages within a step:
1export const stepA = createStep({
2 execute: async ({ mastra, writer }) => {
3 const agent = mastra!.getAgent("weatherAgent");
4 const prompt = "some prompt";
5 const response = agent.streamVNext([
6 {
7 role: "user",
8 content: prompt,
9 },
10 ]);
11
12 await response.pipeTo(writer);
13
14 // or simply call writer.write({})
15
16 return {
17 activities: await response.text,
18 };
19 },
20});
This will pipe the agent output into the stream:
1{
2 type: 'step-output',
3 runId: 'string',
4 from: 'USER',
5 payload: {
6 output: // nested output of the agent,
7 toolCallId: string,
8 toolName: string
9 }
10}
To get the total usage at the end, simply call await stream.usage
:
console.log("total usage", await stream.usage);
What's Next
AgentNetwork
streaming support is coming soon. Also, in a couple weeks, we'll merge the streamVNext
support back into the main stream
function.
If you have any feedback on improvements, let us know on Discord or in GitHub.