Getting Started
To use vNext workflows, first import the necessary functions from the vNext module:
import { createWorkflow, createStep } from "@mastra/core/workflows/vNext";
import { z } from "zod"; // For schema validation
Key Concepts
vNext workflows consist of:
- Schemas: Type definitions for inputs and outputs using Zod
- Steps: Individual units of work with defined inputs and outputs
- Workflows: Orchestrations of steps with defined execution patterns. A workflow is also a step and can be used as such in other workflows.
- Workflow execution flow: How steps are executed and connected to each other
Schemas are defined using Zod both for inputs and outputs of steps and workflows. Schemas can also dictate what data does a step take when resuming from a suspended state, as well as what contextual information should be passed when suspending a step’s execution.
The inputs and outputs of steps that are connected together should match: the inputSchema of a step should be the same as the outputSchema of the previous step, for instance. The same is true, when using workflows as steps in other workflows, the workflow’s inputSchema should match the outputSchema of the step it is used as.
Steps are run using an execute
function that receives a context object with inputs from the previous step and/or resume data if the step is being resumed from a suspended state. The execute
function should return a value that matches its outputSchema.
Primitives such as .then()
, .parallel()
and .branch()
describe the execution flow of workflows, and how tthe steps within them are connected. Running worksflows (whether standalone or as a step), their execution is dictated by their execution flow instead of an execute
function. The final result of a workflow will always be the result of its last step, which should match the workflow’s outputSchema.
Creating Workflows
Steps
Steps are the building blocks of workflows. Create a step using createStep
:
const myStep = createStep({
id: "my-step",
description: "Does something useful",
inputSchema: z.object({
inputValue: z.string(),
}),
outputSchema: z.object({
outputValue: z.string(),
}),
resumeSchema: z.object({
resumeValue: z.string(),
}),
suspendSchema: z.object({
suspendValue: z.string(),
}),
execute: async ({
inputData,
mastra,
getStepResult,
getInitData,
runtimeContext,
}) => {
const otherStepOutput = getStepResult(step2);
const initData = getInitData<typeof workflow>(); // typed as the workflow input schema
return {
outputValue: `Processed: ${inputData.inputValue}, ${initData.startValue} (runtimeContextValue: ${runtimeContext.get("runtimeContextValue")})`,
};
},
});
Each step requires:
id
: Unique identifier for the stepinputSchema
: Zod schema defining expected inputoutputSchema
: Zod schema defining output shaperesumeSchema
: Optional. Zod schema defining resume inputsuspendSchema
: Optional. Zod schema defining suspend inputexecute
: Async function that performs the step’s work
The execute
function receives a context object with:
inputData
: The input data matching the inputSchemaresumeData
: The resume data matching the resumeSchema, when resuming the step from a suspended state. Only exists if the step is being resumed.mastra
: Access to mastra services (agents, tools, etc.)getStepResult
: Function to access results from other stepsgetInitData
: Function to access the initial input data of the workflow in any stepsuspend
: Function to pause workflow execution (for user interaction)
Workflow Structure
Create a workflow using createWorkflow
:
const myWorkflow = createWorkflow({
id: "my-workflow",
inputSchema: z.object({
startValue: z.string(),
}),
outputSchema: z.object({
result: z.string(),
}),
steps: [step1, step2, step3], // Declare steps used in this workflow
});
const mastra = new Mastra({
vnext_workflows: {
myWorkflow,
},
});
const run = mastra.vnext_getWorkflow("myWorkflow").createRun();
The steps
property in the workflow options provides type safety for accessing step results. When you declare the steps used in your workflow, TypeScript will ensure type safety when accessing result.steps
:
// With steps declared in workflow options
const workflow = createWorkflow({
id: "my-workflow",
inputSchema: z.object({}),
outputSchema: z.object({}),
steps: [step1, step2], // TypeScript knows these steps exist
});
const result = await workflow.createRun().start({ inputData: {} });
if (result.status === "success") {
console.log(result.result); // only exists if status is success
} else if (result.status === "failed") {
console.error(result.error); // only exists if status is failed, this is an instance of Error
throw result.error;
} else if (result.status === "suspended") {
console.log(result.suspended); // only exists if status is suspended
}
// TypeScript knows these properties exist and their types
console.log(result.steps.step1.output); // Fully typed
console.log(result.steps.step2.output); // Fully typed
Workflow definition requires:
id
: Unique identifier for the workflowinputSchema
: Zod schema defining workflow inputoutputSchema
: Zod schema defining workflow outputsteps
: Array of steps used in the workflow (optional, but recommended for type safety)
Re-using steps and nested workflows
You can re-use steps and nested workflows by cloning them:
const clonedStep = cloneStep(myStep, { id: "cloned-step" });
const clonedWorkflow = cloneWorkflow(myWorkflow, { id: "cloned-workflow" });
This way you can use the same step or nested workflow in the same workflow multiple times.
import {
createWorkflow,
createStep,
cloneStep,
cloneWorkflow,
} from "@mastra/core/workflows/vNext";
const myWorkflow = createWorkflow({
id: "my-workflow",
steps: [step1, step2, step3],
});
myWorkflow.then(step1).then(step2).then(step3).commit();
const parentWorkflow = createWorkflow({
id: "parent-workflow",
steps: [myWorkflow, step4],
});
parentWorkflow
.then(myWorkflow)
.then(step4)
.then(cloneWorkflow(myWorkflow, { id: "cloned-workflow" }))
.then(cloneStep(step4, { id: "cloned-step-4" }))
.commit();
Running Workflows
After defining a workflow, run it with:
// Create a run instance
const run = myWorkflow.createRun();
// Start the workflow with input data
const result = await run.start({
inputData: {
startValue: "initial data",
},
});
// Access the results
console.log(result.steps); // All step results
console.log(result.steps["step-id"].output); // Output from a specific step
if (result.status === "success") {
console.log(result.result); // The final result of the workflow, result of the last step (or `.map()` output, if used as last step)
} else if (result.status === "suspended") {
const resumeResult = await run.resume({
step: result.suspended[0], // there is always at least one step id in the suspended array, in this case we resume the first suspended execution path
resumeData: {
/* user input */
},
});
} else if (result.status === "failed") {
console.error(result.error); // only exists if status is failed, this is an instance of Error
}
Workflow Execution Result Schema
The result of running a workflow (either from start()
or resume()
) follows this TypeScript interface:
export type WorkflowResult<...> =
| {
status: 'success';
result: z.infer<TOutput>;
steps: {
[K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined
? StepResult<unknown>
: StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>;
};
}
| {
status: 'failed';
steps: {
[K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined
? StepResult<unknown>
: StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>;
};
error: Error;
}
| {
status: 'suspended';
steps: {
[K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined
? StepResult<unknown>
: StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>;
};
suspended: [string[], ...string[][]];
};
Result Properties Explained
-
status: Indicates the final state of the workflow execution
'success'
: Workflow completed successfully'failed'
: Workflow encountered an error'suspended'
: Workflow is paused waiting for user input
-
result: Contains the final output of the workflow, typed according to the workflow’s
outputSchema
-
suspended: Optional array of step IDs that are currently suspended. Only present when
status
is'suspended'
-
steps: A record containing the results of all executed steps
- Keys are step IDs
- Values are
StepResult
objects containing the step’s output - Type-safe based on each step’s
outputSchema
-
error: Optional error object present when
status
is'failed'
Watching Workflow Execution
You can also watch workflow execution:
const run = myWorkflow.createRun();
// Add a watcher to monitor execution
run.watch(event => {
console.log('Step completed:', event.payload.currentStep.id);
});
// Start the workflow
const result = await run.start({ inputData: {...} });
The event
object has the following schema:
type WatchEvent = {
type: "watch";
payload: {
currentStep?: {
id: string;
status: "running" | "completed" | "failed" | "suspended";
output?: Record<string, any>;
payload?: Record<string, any>;
};
workflowState: {
status: "running" | "success" | "failed" | "suspended";
steps: Record<
string,
{
status: "running" | "completed" | "failed" | "suspended";
output?: Record<string, any>;
payload?: Record<string, any>;
}
>;
result?: Record<string, any>;
error?: Record<string, any>;
payload?: Record<string, any>;
};
};
eventTimestamp: Date;
};
The currentStep
property is only present when the workflow is running. When the workflow is finished the status on workflowState
is changed, as well as the result
and error
properties. At the same time the currentStep
property is removed.