並列実行
多くのワークフローでは、複数の処理を同時に実行する必要があります。これらの例では、.parallel()
を使ってステップやワークフローを並行実行し、その結果を統合する方法を示します。
ステップを使った並列実行
この例では、ワークフローは .parallel()
を使って step1
と step2
を実行します。各ステップは同じ入力を受け取り、独立して実行されます。出力はステップの id
ごとに名前空間化され、まとめて step3
に渡されます。step3
は結果を合成し、最終的な値を返します。
src/mastra/workflows/example-parallel-steps.ts
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({
id: "step-1",
description: "passes value from input to output",
inputSchema: z.object({
value: z.number()
}),
outputSchema: z.object({
value: z.number()
}),
execute: async ({ inputData }) => {
const { value } = inputData;
return {
value
};
}
});
const step2 = createStep({
id: "step-2",
description: "passes value from input to output",
inputSchema: z.object({
value: z.number()
}),
outputSchema: z.object({
value: z.number()
}),
execute: async ({ inputData }) => {
const { value } = inputData;
return {
value
};
}
});
const step3 = createStep({
id: "step-3",
description: "sums values from step-1 and step-2",
inputSchema: z.object({
"step-1": z.object({ value: z.number() }),
"step-2": z.object({ value: z.number() })
}),
outputSchema: z.object({
value: z.number()
}),
execute: async ({ inputData }) => {
return {
value: inputData["step-1"].value + inputData["step-2"].value
};
}
});
export const parallelSteps = createWorkflow({
id: "parallel-workflow",
description: "A workflow that runs steps in parallel plus a final step",
inputSchema: z.object({
value: z.number()
}),
outputSchema: z.object({
value: z.number()
})
})
.parallel([step1, step2])
.then(step3)
.commit();
ワークフローによる並列実行
この例では、.parallel()
を使って workflow1
と workflow2
の2つのワークフローを同時に実行します。各ワークフローは、入力値をそのまま返す単一のステップを含みます。出力はワークフローの id
ごとに名前空間化され、step3
に渡されます。step3
は結果を合算して最終的な値を返します。
src/mastra/workflows/example-parallel-workflows.ts
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({
id: "step-1",
description: "passes value from input to output",
inputSchema: z.object({
value: z.number()
}),
outputSchema: z.object({
value: z.number()
}),
execute: async ({ inputData }) => {
const { value } = inputData;
return {
value
};
}
});
const step2 = createStep({
id: "step-2",
description: "passes value from input to output",
inputSchema: z.object({
value: z.number()
}),
outputSchema: z.object({
value: z.number()
}),
execute: async ({ inputData }) => {
const { value } = inputData;
return {
value
};
}
});
const step3 = createStep({
id: "step-3",
description: "sums values from step-1 and step-2",
inputSchema: z.object({
"workflow-1": z.object({ value: z.number() }),
"workflow-2": z.object({ value: z.number() })
}),
outputSchema: z.object({
value: z.number()
}),
execute: async ({ inputData }) => {
return {
value: inputData["workflow-1"].value + inputData["workflow-2"].value
};
}
});
export const workflow1 = createWorkflow({
id: "workflow-1",
inputSchema: z.object({
value: z.number()
}),
outputSchema: z.object({
value: z.number()
})
})
.then(step1)
.commit();
export const workflow2 = createWorkflow({
id: "workflow-2",
inputSchema: z.object({
value: z.number()
}),
outputSchema: z.object({
value: z.number()
})
})
.then(step2)
.commit();
export const parallelWorkflows = createWorkflow({
id: "parallel-workflow",
inputSchema: z.object({
value: z.number()
}),
outputSchema: z.object({
value: z.number()
})
})
.parallel([workflow1, workflow2])
.then(step3)
.commit();
関連項目
ワークフロー(レガシー)
以下のリンクでは、レガシーなワークフローに関するドキュメント例を参照できます。