Control Flow
When you build a workflow, you typically break down operations into smaller tasks that can be linked and reused. Steps provide a structured way to manage these tasks by defining inputs, outputs, and execution logic.
- If the schemas match, the
outputSchema
from each step is automatically passed to theinputSchema
of the next step. - If the schemas don’t match, use Input data mapping to transform the
outputSchema
into the expectedinputSchema
.
Sequential
Chain steps to execute in sequence using .then()
:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const step2 = createStep({...});
export const testWorkflow = createWorkflow({...})
.then(step1)
.then(step2)
.commit();
Parallel
Execute steps in parallel using .parallel()
:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const step2 = createStep({...});
export const testWorkflow = createWorkflow({...})
.parallel([step1, step2])
.commit();
This executes all steps in the array concurrently, then continues to the next step after all parallel steps complete.
See Parallel Execution with Steps for more information.
Branch
Create conditional branches using .branch()
:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const lessThanStep = createStep({...});
const greaterThanStep = createStep({...});
export const testWorkflow = createWorkflow({...})
.branch([
[async ({ inputData: { some_value } }) => some_value <= 9, lessThanStep],
[async ({ inputData: { some_value } }) => some_value >= 10, greaterThanStep]
])
.commit();
Branch conditions are evaluated sequentially, but steps with matching conditions are executed in parallel.
See Workflow with Conditional Branching for more information.
Loops
Workflows support two types of loops. When looping a step, or any step-compatible construct like a nested workflow, the initial inputData
is sourced from the output of the previous step.
To ensure compatibility, the loop’s initial input must either:
- Match the shape of the previous step’s output, or
- Be explicitly transformed using the
map
function.
Dowhile
Executes a step repeatedly while a condition is true.
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const counterStep = createStep({...});
export const testWorkflow = createWorkflow({...})
.dowhile(counterStep, async ({ inputData: { number } }) => number < 10)
.commit();
Dountil
Executes a step repeatedly until a condition becomes true.
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const counterStep = createStep({...});
export const testWorkflow = createWorkflow({...})
.dountil(counterStep, async ({ inputData: { number } }) => number > 10)
.commit();
Foreach
Sequentially executes the same step for each item from the inputSchema
.
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const mapStep = createStep({...});
export const testWorkflow = createWorkflow({...})
.foreach(mapStep)
.commit();
Example Run Instance
The following example demonstrates how to start a run with multiple inputs. Each input will pass through the mapStep
sequentially.
import { mastra } from "./mastra";
const run = mastra.getWorkflow("testWorkflow").createRun();
const result = await run.start({
inputData: [{ number: 10 }, { number: 100 }, { number: 200 }]
});
To execute this run from your terminal:
npx tsx src/test-workflow.ts
Concurrency
Optionally, using concurrency
allows you to execute steps in parallel with a limit on the number of concurrent executions.
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const mapStep = createStep({...})
export const testWorkflow = createWorkflow({...})
.foreach(mapStep, { concurrency: 2 })
.commit();
Parallel Workflows
Workflows themselves can also be executed in parallel.
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const workflow1 = createWorkflow({...});
const workflow2 = createWorkflow({...});
export const testWorkflow = createWorkflow({...})
.parallel([workflow1, workflow2])
.commit();
Parallel steps receive previous step results as input. Their outputs are passed into the next step input as an object where the key is the step id
and the value is the step output
.
Nested Workflows
In the example below, nestedWorkflow
is used as a step within testWorkflow
. The testWorkflow
uses step1
whilst the nestedWorkflow
composes step2
and step3
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
export const nestedWorkflow = createWorkflow({...})
export const testWorkflow = createWorkflow({...})
.then(nestedWorkflow)
.commit();
When using .branch()
or .parallel()
to build more complex control flows, executing more than one step requires wrapping those steps in a nested workflow, along with a clear definition of how they should be executed.
Cloned Workflows
In the example below, clonedWorkflow
is a clone of workflow1
and is used as a step within testWorkflow
. The clonedWorkflow
is run sequentially after step1
.
import { createWorkflow, createStep, cloneWorkflow } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const clonedWorkflow = cloneWorkflow(step1, { id: "cloned-workflow" });
export const testWorkflow = createWorkflow({...})
.then(step1)
.then(clonedWorkflow)
.commit();