Human in the Loop Workflow
Human-in-the-loop ワークフローでは、特定のポイントで実行を一時停止し、ユーザーからの入力を収集したり、意思決定を行ったり、人間の判断が必要なアクションを実行したりすることができます。この例では、人間による介入ポイントを含むワークフローの作成方法を示します。
仕組み
- ワークフローステップは、
suspend()
関数を使って実行を一時停止することができ、オプションで人間の意思決定者のためのコンテキストを含むペイロードを渡すことができます。 - ワークフローが再開されると、人間の入力は
resume()
呼び出しのcontext
パラメータに渡されます。 - この入力は、ステップの実行コンテキスト内で
context.inputData
として利用可能になり、ステップのinputSchema
に従った型になります。 - その後、ステップは人間の入力に基づいて実行を続行できます。
このパターンにより、自動化されたワークフローにおいて、安全で型チェックされた人間の介入が可能になります。
Inquirer を使った対話型ターミナル例
この例では、Inquirer ライブラリを使用して、ワークフローが一時停止した際にターミナルから直接ユーザー入力を収集し、真にインタラクティブな human-in-the-loop 体験を実現する方法を示します。
import { Mastra } from "@mastra/core";
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
import { confirm, input, select } from "@inquirer/prompts";
// Step 1: Generate product recommendations
const generateRecommendations = new Step({
id: "generateRecommendations",
outputSchema: z.object({
customerName: z.string(),
recommendations: z.array(
z.object({
productId: z.string(),
productName: z.string(),
price: z.number(),
description: z.string(),
}),
),
}),
execute: async ({ context }) => {
const customerName = context.triggerData.customerName;
// In a real application, you might call an API or ML model here
// For this example, we'll return mock data
return {
customerName,
recommendations: [
{
productId: "prod-001",
productName: "Premium Widget",
price: 99.99,
description: "Our best-selling premium widget with advanced features",
},
{
productId: "prod-002",
productName: "Basic Widget",
price: 49.99,
description: "Affordable entry-level widget for beginners",
},
{
productId: "prod-003",
productName: "Widget Pro Plus",
price: 149.99,
description: "Professional-grade widget with extended warranty",
},
],
};
},
});
// Step 2: Get human approval and customization for the recommendations
const reviewRecommendations = new Step({
id: "reviewRecommendations",
inputSchema: z.object({
approvedProducts: z.array(z.string()),
customerNote: z.string().optional(),
offerDiscount: z.boolean().optional(),
}),
outputSchema: z.object({
finalRecommendations: z.array(
z.object({
productId: z.string(),
productName: z.string(),
price: z.number(),
}),
),
customerNote: z.string().optional(),
offerDiscount: z.boolean(),
}),
execute: async ({ context, suspend }) => {
const { customerName, recommendations } = context.getStepResult(
generateRecommendations,
) || {
customerName: "",
recommendations: [],
};
// Check if we have input from a resumed workflow
const reviewInput = {
approvedProducts: context.inputData?.approvedProducts || [],
customerNote: context.inputData?.customerNote,
offerDiscount: context.inputData?.offerDiscount,
};
// If we don't have agent input yet, suspend for human review
if (!reviewInput.approvedProducts.length) {
console.log(`Generating recommendations for customer: ${customerName}`);
await suspend({
customerName,
recommendations,
message:
"Please review these product recommendations before sending to the customer",
});
// Placeholder return (won't be reached due to suspend)
return {
finalRecommendations: [],
customerNote: "",
offerDiscount: false,
};
}
// Process the agent's product selections
const finalRecommendations = recommendations
.filter((product) =>
reviewInput.approvedProducts.includes(product.productId),
)
.map((product) => ({
productId: product.productId,
productName: product.productName,
price: product.price,
}));
return {
finalRecommendations,
customerNote: reviewInput.customerNote || "",
offerDiscount: reviewInput.offerDiscount || false,
};
},
});
// Step 3: Send the recommendations to the customer
const sendRecommendations = new Step({
id: "sendRecommendations",
outputSchema: z.object({
emailSent: z.boolean(),
emailContent: z.string(),
}),
execute: async ({ context }) => {
const { customerName } = context.getStepResult(generateRecommendations) || {
customerName: "",
};
const { finalRecommendations, customerNote, offerDiscount } =
context.getStepResult(reviewRecommendations) || {
finalRecommendations: [],
customerNote: "",
offerDiscount: false,
};
// Generate email content based on the recommendations
let emailContent = `Dear ${customerName},\n\nBased on your preferences, we recommend:\n\n`;
finalRecommendations.forEach((product) => {
emailContent += `- ${product.productName}: $${product.price.toFixed(2)}\n`;
});
if (offerDiscount) {
emailContent +=
"\nAs a valued customer, use code SAVE10 for 10% off your next purchase!\n";
}
if (customerNote) {
emailContent += `\nPersonal note: ${customerNote}\n`;
}
emailContent += "\nThank you for your business,\nThe Sales Team";
// In a real application, you would send this email
console.log("Email content generated:", emailContent);
return {
emailSent: true,
emailContent,
};
},
});
// Build the workflow
const recommendationWorkflow = new Workflow({
name: "product-recommendation-workflow",
triggerSchema: z.object({
customerName: z.string(),
}),
});
recommendationWorkflow
.step(generateRecommendations)
.then(reviewRecommendations)
.then(sendRecommendations)
.commit();
// Register the workflow
const mastra = new Mastra({
workflows: { recommendationWorkflow },
});
// Example of using the workflow with Inquirer prompts
async function runRecommendationWorkflow() {
const registeredWorkflow = mastra.getWorkflow("recommendationWorkflow");
const run = registeredWorkflow.createRun();
console.log("Starting product recommendation workflow...");
const result = await run.start({
triggerData: {
customerName: "Jane Smith",
},
});
const isReviewStepSuspended =
result.activePaths.get("reviewRecommendations")?.status === "suspended";
// Check if workflow is suspended for human review
if (isReviewStepSuspended) {
const { customerName, recommendations, message } = result.activePaths.get(
"reviewRecommendations",
)?.suspendPayload;
console.log("\n===================================");
console.log(message);
console.log(`Customer: ${customerName}`);
console.log("===================================\n");
// Use Inquirer to collect input from the sales agent in the terminal
console.log("Available product recommendations:");
recommendations.forEach((product, index) => {
console.log(
`${index + 1}. ${product.productName} - $${product.price.toFixed(2)}`,
);
console.log(` ${product.description}\n`);
});
// Let the agent select which products to recommend
const approvedProducts = await checkbox({
message: "Select products to recommend to the customer:",
choices: recommendations.map((product) => ({
name: `${product.productName} ($${product.price.toFixed(2)})`,
value: product.productId,
})),
});
// Let the agent add a personal note
const includeNote = await confirm({
message: "Would you like to add a personal note?",
default: false,
});
let customerNote = "";
if (includeNote) {
customerNote = await input({
message: "Enter your personalized note for the customer:",
});
}
// Ask if a discount should be offered
const offerDiscount = await confirm({
message: "Offer a 10% discount to this customer?",
default: false,
});
console.log("\nSubmitting your review...");
// Resume the workflow with the agent's input
const resumeResult = await run.resume({
stepId: "reviewRecommendations",
context: {
approvedProducts,
customerNote,
offerDiscount,
},
});
console.log("\n===================================");
console.log("Workflow completed!");
console.log("Email content:");
console.log("===================================\n");
console.log(
resumeResult?.results?.sendRecommendations ||
"No email content generated",
);
return resumeResult;
}
return result;
}
// Invoke the workflow with interactive terminal input
runRecommendationWorkflow().catch(console.error);
複数のユーザー入力を伴う高度な例
この例では、コンテンツモデレーションシステムのように、複数回の人間による介入が必要となる、より複雑なワークフローを示します。
import { Mastra } from "@mastra/core";
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
import { select, input } from "@inquirer/prompts";
// Step 1: Receive and analyze content
const analyzeContent = new Step({
id: "analyzeContent",
outputSchema: z.object({
content: z.string(),
aiAnalysisScore: z.number(),
flaggedCategories: z.array(z.string()).optional(),
}),
execute: async ({ context }) => {
const content = context.triggerData.content;
// Simulate AI analysis
const aiAnalysisScore = simulateContentAnalysis(content);
const flaggedCategories =
aiAnalysisScore < 0.7
? ["potentially inappropriate", "needs review"]
: [];
return {
content,
aiAnalysisScore,
flaggedCategories,
};
},
});
// Step 2: Moderate content that needs review
const moderateContent = new Step({
id: "moderateContent",
// Define the schema for human input that will be provided when resuming
inputSchema: z.object({
moderatorDecision: z.enum(["approve", "reject", "modify"]).optional(),
moderatorNotes: z.string().optional(),
modifiedContent: z.string().optional(),
}),
outputSchema: z.object({
moderationResult: z.enum(["approved", "rejected", "modified"]),
moderatedContent: z.string(),
notes: z.string().optional(),
}),
// @ts-ignore
execute: async ({ context, suspend }) => {
const analysisResult = context.getStepResult(analyzeContent);
// Access the input provided when resuming the workflow
const moderatorInput = {
decision: context.inputData?.moderatorDecision,
notes: context.inputData?.moderatorNotes,
modifiedContent: context.inputData?.modifiedContent,
};
// If the AI analysis score is high enough, auto-approve
if (
analysisResult?.aiAnalysisScore > 0.9 &&
!analysisResult?.flaggedCategories?.length
) {
return {
moderationResult: "approved",
moderatedContent: analysisResult.content,
notes: "Auto-approved by system",
};
}
// If we don't have moderator input yet, suspend for human review
if (!moderatorInput.decision) {
await suspend({
content: analysisResult?.content,
aiScore: analysisResult?.aiAnalysisScore,
flaggedCategories: analysisResult?.flaggedCategories,
message: "Please review this content and make a moderation decision",
});
// Placeholder return
return {
moderationResult: "approved",
moderatedContent: "",
};
}
// Process the moderator's decision
switch (moderatorInput.decision) {
case "approve":
return {
moderationResult: "approved",
moderatedContent: analysisResult?.content || "",
notes: moderatorInput.notes || "Approved by moderator",
};
case "reject":
return {
moderationResult: "rejected",
moderatedContent: "",
notes: moderatorInput.notes || "Rejected by moderator",
};
case "modify":
return {
moderationResult: "modified",
moderatedContent:
moderatorInput.modifiedContent || analysisResult?.content || "",
notes: moderatorInput.notes || "Modified by moderator",
};
default:
return {
moderationResult: "rejected",
moderatedContent: "",
notes: "Invalid moderator decision",
};
}
},
});
// Step 3: Apply moderation actions
const applyModeration = new Step({
id: "applyModeration",
outputSchema: z.object({
finalStatus: z.string(),
content: z.string().optional(),
auditLog: z.object({
originalContent: z.string(),
moderationResult: z.string(),
aiScore: z.number(),
timestamp: z.string(),
}),
}),
execute: async ({ context }) => {
const analysisResult = context.getStepResult(analyzeContent);
const moderationResult = context.getStepResult(moderateContent);
// Create audit log
const auditLog = {
originalContent: analysisResult?.content || "",
moderationResult: moderationResult?.moderationResult || "unknown",
aiScore: analysisResult?.aiAnalysisScore || 0,
timestamp: new Date().toISOString(),
};
// Apply moderation action
switch (moderationResult?.moderationResult) {
case "approved":
return {
finalStatus: "Content published",
content: moderationResult.moderatedContent,
auditLog,
};
case "modified":
return {
finalStatus: "Content modified and published",
content: moderationResult.moderatedContent,
auditLog,
};
case "rejected":
return {
finalStatus: "Content rejected",
auditLog,
};
default:
return {
finalStatus: "Error in moderation process",
auditLog,
};
}
},
});
// Build the workflow
const contentModerationWorkflow = new Workflow({
name: "content-moderation-workflow",
triggerSchema: z.object({
content: z.string(),
}),
});
contentModerationWorkflow
.step(analyzeContent)
.then(moderateContent)
.then(applyModeration)
.commit();
// Register the workflow
const mastra = new Mastra({
workflows: { contentModerationWorkflow },
});
// Example of using the workflow with Inquirer prompts
async function runModerationDemo() {
const registeredWorkflow = mastra.getWorkflow("contentModerationWorkflow");
const run = registeredWorkflow.createRun();
// Start the workflow with content that needs review
console.log("コンテンツモデレーションワークフローを開始します...");
const result = await run.start({
triggerData: {
content: "これはモデレーションが必要なユーザー生成コンテンツです。",
},
});
const isReviewStepSuspended =
result.activePaths.get("moderateContent")?.status === "suspended";
// Check if workflow is suspended
if (isReviewStepSuspended) {
const { content, aiScore, flaggedCategories, message } =
result.activePaths.get("moderateContent")?.suspendPayload;
console.log("\n===================================");
console.log(message);
console.log("===================================\n");
console.log("レビュー対象のコンテンツ:");
console.log(content);
console.log(`\nAI分析スコア: ${aiScore}`);
console.log(
`フラグ付けされたカテゴリ: ${flaggedCategories?.join(", ") || "なし"}\n`,
);
// Collect moderator decision using Inquirer
const moderatorDecision = await select({
message: "モデレーションの判断を選択してください:",
choices: [
{ name: "このままコンテンツを承認する", value: "approve" },
{ name: "コンテンツを完全に却下する", value: "reject" },
{ name: "公開前にコンテンツを修正する", value: "modify" },
],
});
// Collect additional information based on decision
let moderatorNotes = "";
let modifiedContent = "";
moderatorNotes = await input({
message: "判断に関するメモを入力してください:",
});
if (moderatorDecision === "modify") {
modifiedContent = await input({
message: "修正後のコンテンツを入力してください:",
default: content,
});
}
console.log("\nモデレーションの判断を送信しています...");
// Resume the workflow with the moderator's input
const resumeResult = await run.resume({
stepId: "moderateContent",
context: {
moderatorDecision,
moderatorNotes,
modifiedContent,
},
});
if (resumeResult?.results?.applyModeration?.status === "success") {
console.log("\n===================================");
console.log(
`モデレーション完了: ${resumeResult?.results?.applyModeration?.output.finalStatus}`,
);
console.log("===================================\n");
if (resumeResult?.results?.applyModeration?.output.content) {
console.log("公開されたコンテンツ:");
console.log(resumeResult.results.applyModeration.output.content);
}
}
return resumeResult;
}
console.log(
"人による介入なしでワークフローが完了しました:",
result.results,
);
return result;
}
// Helper function for AI content analysis simulation
function simulateContentAnalysis(content: string): number {
// In a real application, this would call an AI service
// For the example, we're returning a random score
return Math.random();
}
// Invoke the demo function
runModerationDemo().catch(console.error);
主要な概念
-
サスペンションポイント - ステップの
execute
内でsuspend()
関数を使用して、ワークフローの実行を一時停止します。 -
サスペンションペイロード - サスペンド時に関連データを渡し、人間による意思決定のためのコンテキストを提供します:
await suspend({
messageForHuman: "Please review this data",
data: someImportantData,
});
- ワークフローステータスの確認 - ワークフロー開始後、返されたステータスを確認してサスペンドされているかどうかを判断します:
const result = await workflow.start({ triggerData });
if (result.status === "suspended" && result.suspendedStepId === "stepId") {
// サスペンションの処理
console.log("Workflow is waiting for input:", result.suspendPayload);
}
- 対話型ターミナル入力 - Inquirer などのライブラリを使って対話型プロンプトを作成します:
import { select, input, confirm } from "@inquirer/prompts";
// ワークフローがサスペンドされたとき
if (result.status === "suspended") {
// サスペンドペイロードから情報を表示
console.log(result.suspendPayload.message);
// ユーザー入力を対話的に収集
const decision = await select({
message: "What would you like to do?",
choices: [
{ name: "Approve", value: "approve" },
{ name: "Reject", value: "reject" },
],
});
// 収集した入力でワークフローを再開
await run.resume({
stepId: result.suspendedStepId,
context: { decision },
});
}
- ワークフローの再開 -
resume()
メソッドを使って、人間の入力とともにワークフローの実行を続行します:
const resumeResult = await run.resume({
stepId: "suspendedStepId",
context: {
// このデータは context.inputData としてサスペンドされたステップに渡されます
// また、ステップの inputSchema に準拠している必要があります
userDecision: "approve",
},
});
- 人間のデータ用インプットスキーマ - 人間の入力で再開される可能性のあるステップには、型安全性を確保するためにインプットスキーマを定義します:
const myStep = new Step({
id: "myStep",
inputSchema: z.object({
// このスキーマは resume の context で渡されるデータを検証します
// また、context.inputData として利用可能になります
userDecision: z.enum(["approve", "reject"]),
userComments: z.string().optional(),
}),
execute: async ({ context, suspend }) => {
// 以前のサスペンションからユーザー入力があるか確認
if (context.inputData?.userDecision) {
// ユーザーの決定を処理
return { result: `User decided: ${context.inputData.userDecision}` };
}
// 入力がなければ、人間の判断を待つためにサスペンド
await suspend();
},
});
Human-in-the-loop ワークフローは、自動化と人間の判断を組み合わせたシステムの構築に非常に有効です。例えば:
- コンテンツモデレーションシステム
- 承認ワークフロー
- 監督付きAIシステム
- エスカレーションを伴うカスタマーサービス自動化
GitHubで例を見る