Input Data Mapping
Input data mapping allows explicit mapping of values for the inputs of the next step. These values can come from a number of sources:
- The outputs of a previous step
- The runtime context
- A constant value
- The initial input of the workflow
myWorkflow
.then(step1)
.map({
transformedValue: {
step: step1,
path: "nestedValue",
},
runtimeContextValue: {
runtimeContextPath: "runtimeContextValue",
schema: z.number(),
},
constantValue: {
value: 42,
schema: z.number(),
},
initDataValue: {
initData: myWorkflow,
path: "startValue",
},
})
.then(step2)
.commit();
There are many cases where .map()
can be useful in matching inputs to outputs, whether it’s renaming outputs to match inputs or mapping complex data structures or other previous step outputs.
Renaming outputs
One use case for input mappings is renaming outputs to match inputs:
import { Mastra } from "@mastra/core";
import { createWorkflow, createStep } from "@mastra/core/workflows/vNext";
import { z } from "zod";
const step1 = createStep({
id: "step1",
inputSchema: z.object({
inputValue: z.string(),
}),
outputSchema: z.object({
outputValue: z.string(),
}),
execute: async ({ inputData, mastra }) => {
mastra.getLogger()?.debug(`Step 1 received: ${inputData.inputValue}`);
return { outputValue: `${inputData.inputValue}` };
},
});
const step2 = createStep({
id: "step2",
inputSchema: z.object({
unexpectedName: z.string(),
}),
outputSchema: z.string(),
execute: async ({ inputData, mastra }) => {
mastra.getLogger()?.debug(`Step 2 received: ${inputData.unexpectedName}`);
return `${inputData.unexpectedName}`;
},
});
const myWorkflow = createWorkflow({
id: "my-workflow",
inputSchema: z.object({
inputValue: z.string(),
}),
outputSchema: z.string(),
steps: [step1, step2],
})
.then(step1)
// mapping output from step1 "outputValue"
// to input for step2 "unexpectedName"
.map({
unexpectedName: {
step: step1,
path: "outputValue",
},
})
.then(step2)
.commit();
const mastra = new Mastra({
vnext_workflows: {
myWorkflow,
}
});
const run = mastra.vnext_getWorkflow("myWorkflow").createRun();
const res = await run.start({
inputData: { inputValue: "Hello world" }
});
if (res.status === "success") {
console.log(res.result);
}
Using workflow inputs as later step inputs
import { Mastra } from "@mastra/core";
import { createWorkflow, createStep } from "@mastra/core/workflows/vNext";
import { z } from "zod";
const step1 = createStep({
id: "step1",
inputSchema: z.object({
inputValue: z.string(),
}),
outputSchema: z.object({
outputValue: z.string(),
}),
execute: async ({ inputData, mastra }) => {
mastra.getLogger()?.debug(`Step 1 received: ${inputData.inputValue}`);
return { outputValue: `Processed: ${inputData.inputValue}` };
},
});
const step2 = createStep({
id: "step2",
inputSchema: z.object({
outputValue: z.string(),
initialValue: z.string(),
}),
outputSchema: z.object({
result: z.string(),
}),
execute: async ({ inputData, mastra }) => {
mastra.getLogger()?.debug(`Step 2 received: ${inputData.outputValue} and original: ${inputData.initialValue}`);
return { result: `Combined: ${inputData.outputValue} (original: ${inputData.initialValue})` };
},
});
const myWorkflow = createWorkflow({
id: "my-workflow",
inputSchema: z.object({
inputValue: z.string(),
}),
outputSchema: z.object({
result: z.string(),
}),
steps: [step1, step2],
})
myWorkflow
.then(step1)
.map({
outputValue: {
step: step1,
path: "outputValue",
},
initialValue: {
initData: myWorkflow,
path: "inputValue",
},
})
.then(step2)
.commit();
// Create Mastra instance with all workflows
const mastra = new Mastra({
vnext_workflows: {
myWorkflow,
}
});
const run = mastra.vnext_getWorkflow("myWorkflow").createRun();
const res = await run.start({
inputData: { inputValue: "Original input" }
});
if (res.status === "success") {
console.log("Result:", res.result);
}
Using multiple outputs of previous steps
import { Mastra } from "@mastra/core";
import { createWorkflow, createStep } from "@mastra/core/workflows/vNext";
import { z } from "zod";
const step1 = createStep({
id: "step1",
inputSchema: z.object({
inputValue: z.string(),
}),
outputSchema: z.object({
intermediateValue: z.string(),
}),
execute: async ({ inputData, mastra }) => {
mastra.getLogger()?.debug(`Step 1 received: ${inputData.inputValue}`);
return { intermediateValue: `Step 1: ${inputData.inputValue}` };
},
});
const step2 = createStep({
id: "step2",
inputSchema: z.object({
intermediateValue: z.string(),
}),
outputSchema: z.object({
currentResult: z.string(),
}),
execute: async ({ inputData, mastra }) => {
mastra.getLogger()?.debug(`Step 2 received: ${inputData.intermediateValue}`);
return { currentResult: `Step 2: ${inputData.intermediateValue}` };
},
});
const step3 = createStep({
id: "step3",
inputSchema: z.object({
currentResult: z.string(), // From step2
intermediateValue: z.string(), // From step1
initialValue: z.string(), // From workflow input
}),
outputSchema: z.object({
result: z.string(),
}),
execute: async ({ inputData, mastra }) => {
mastra.getLogger()?.debug(`Step 3 combining all previous data`);
return {
result: `Combined result:
- Initial input: ${inputData.initialValue}
- Step 1 output: ${inputData.intermediateValue}
- Step 2 output: ${inputData.currentResult}`
};
},
});
const myWorkflow = createWorkflow({
id: "my-workflow",
inputSchema: z.object({
inputValue: z.string(),
}),
outputSchema: z.object({
result: z.string(),
}),
steps: [step1, step2, step3],
})
myWorkflow
.then(step1)
.then(step2)
.map({
// Map values from different sources to step3's inputs
initialValue: {
initData: myWorkflow,
path: "inputValue",
},
currentResult: {
step: step2,
path: "currentResult",
},
intermediateValue: {
step: step1,
path: "intermediateValue",
},
})
.then(step3)
.commit();
// Create Mastra instance with all workflows
const mastra = new Mastra({
vnext_workflows: {
myWorkflow,
}
});
const run = mastra.vnext_getWorkflow("myWorkflow").createRun();
const res = await run.start({
inputData: { inputValue: "Starting data" }
});
if (res.status === "success") {
console.log("Result:", res.result);
}