Skip to Content
ワークフロー例:Human in the Loop | ワークフロー | Mastra ドキュメント

Human in the Loop Workflow

Human-in-the-loop ワークフローでは、特定のポイントで実行を一時停止し、ユーザーからの入力を収集したり、意思決定を行ったり、人間の判断が必要なアクションを実行したりすることができます。この例では、人間による介入ポイントを含むワークフローの作成方法を示します。

仕組み

  1. ワークフローステップは、suspend() 関数を使って実行を一時停止することができ、オプションで人間の意思決定者のためのコンテキストを含むペイロードを渡すことができます。
  2. ワークフローが再開されると、人間の入力は resume() 呼び出しの context パラメータに渡されます。
  3. この入力は、ステップの実行コンテキスト内で context.inputData として利用可能になり、ステップの inputSchema に従った型になります。
  4. その後、ステップは人間の入力に基づいて実行を続行できます。

このパターンにより、自動化されたワークフローにおいて、安全で型チェックされた人間の介入が可能になります。

Inquirer を使った対話型ターミナル例

この例では、Inquirer ライブラリを使用して、ワークフローが一時停止した際にターミナルから直接ユーザー入力を収集し、真にインタラクティブな human-in-the-loop 体験を実現する方法を示します。

import { Mastra } from "@mastra/core"; import { Step, Workflow } from "@mastra/core/workflows"; import { z } from "zod"; import { confirm, input, select } from "@inquirer/prompts"; // Step 1: Generate product recommendations const generateRecommendations = new Step({ id: "generateRecommendations", outputSchema: z.object({ customerName: z.string(), recommendations: z.array( z.object({ productId: z.string(), productName: z.string(), price: z.number(), description: z.string(), }), ), }), execute: async ({ context }) => { const customerName = context.triggerData.customerName; // In a real application, you might call an API or ML model here // For this example, we'll return mock data return { customerName, recommendations: [ { productId: "prod-001", productName: "Premium Widget", price: 99.99, description: "Our best-selling premium widget with advanced features", }, { productId: "prod-002", productName: "Basic Widget", price: 49.99, description: "Affordable entry-level widget for beginners", }, { productId: "prod-003", productName: "Widget Pro Plus", price: 149.99, description: "Professional-grade widget with extended warranty", }, ], }; }, });
// Step 2: Get human approval and customization for the recommendations const reviewRecommendations = new Step({ id: "reviewRecommendations", inputSchema: z.object({ approvedProducts: z.array(z.string()), customerNote: z.string().optional(), offerDiscount: z.boolean().optional(), }), outputSchema: z.object({ finalRecommendations: z.array( z.object({ productId: z.string(), productName: z.string(), price: z.number(), }), ), customerNote: z.string().optional(), offerDiscount: z.boolean(), }), execute: async ({ context, suspend }) => { const { customerName, recommendations } = context.getStepResult( generateRecommendations, ) || { customerName: "", recommendations: [], }; // Check if we have input from a resumed workflow const reviewInput = { approvedProducts: context.inputData?.approvedProducts || [], customerNote: context.inputData?.customerNote, offerDiscount: context.inputData?.offerDiscount, }; // If we don't have agent input yet, suspend for human review if (!reviewInput.approvedProducts.length) { console.log(`Generating recommendations for customer: ${customerName}`); await suspend({ customerName, recommendations, message: "Please review these product recommendations before sending to the customer", }); // Placeholder return (won't be reached due to suspend) return { finalRecommendations: [], customerNote: "", offerDiscount: false, }; } // Process the agent's product selections const finalRecommendations = recommendations .filter((product) => reviewInput.approvedProducts.includes(product.productId), ) .map((product) => ({ productId: product.productId, productName: product.productName, price: product.price, })); return { finalRecommendations, customerNote: reviewInput.customerNote || "", offerDiscount: reviewInput.offerDiscount || false, }; }, });
// Step 3: Send the recommendations to the customer const sendRecommendations = new Step({ id: "sendRecommendations", outputSchema: z.object({ emailSent: z.boolean(), emailContent: z.string(), }), execute: async ({ context }) => { const { customerName } = context.getStepResult(generateRecommendations) || { customerName: "", }; const { finalRecommendations, customerNote, offerDiscount } = context.getStepResult(reviewRecommendations) || { finalRecommendations: [], customerNote: "", offerDiscount: false, }; // Generate email content based on the recommendations let emailContent = `Dear ${customerName},\n\nBased on your preferences, we recommend:\n\n`; finalRecommendations.forEach((product) => { emailContent += `- ${product.productName}: $${product.price.toFixed(2)}\n`; }); if (offerDiscount) { emailContent += "\nAs a valued customer, use code SAVE10 for 10% off your next purchase!\n"; } if (customerNote) { emailContent += `\nPersonal note: ${customerNote}\n`; } emailContent += "\nThank you for your business,\nThe Sales Team"; // In a real application, you would send this email console.log("Email content generated:", emailContent); return { emailSent: true, emailContent, }; }, }); // Build the workflow const recommendationWorkflow = new Workflow({ name: "product-recommendation-workflow", triggerSchema: z.object({ customerName: z.string(), }), }); recommendationWorkflow .step(generateRecommendations) .then(reviewRecommendations) .then(sendRecommendations) .commit(); // Register the workflow const mastra = new Mastra({ workflows: { recommendationWorkflow }, });
// Example of using the workflow with Inquirer prompts async function runRecommendationWorkflow() { const registeredWorkflow = mastra.getWorkflow("recommendationWorkflow"); const run = registeredWorkflow.createRun(); console.log("Starting product recommendation workflow..."); const result = await run.start({ triggerData: { customerName: "Jane Smith", }, }); const isReviewStepSuspended = result.activePaths.get("reviewRecommendations")?.status === "suspended"; // Check if workflow is suspended for human review if (isReviewStepSuspended) { const { customerName, recommendations, message } = result.activePaths.get( "reviewRecommendations", )?.suspendPayload; console.log("\n==================================="); console.log(message); console.log(`Customer: ${customerName}`); console.log("===================================\n"); // Use Inquirer to collect input from the sales agent in the terminal console.log("Available product recommendations:"); recommendations.forEach((product, index) => { console.log( `${index + 1}. ${product.productName} - $${product.price.toFixed(2)}`, ); console.log(` ${product.description}\n`); }); // Let the agent select which products to recommend const approvedProducts = await checkbox({ message: "Select products to recommend to the customer:", choices: recommendations.map((product) => ({ name: `${product.productName} ($${product.price.toFixed(2)})`, value: product.productId, })), }); // Let the agent add a personal note const includeNote = await confirm({ message: "Would you like to add a personal note?", default: false, }); let customerNote = ""; if (includeNote) { customerNote = await input({ message: "Enter your personalized note for the customer:", }); } // Ask if a discount should be offered const offerDiscount = await confirm({ message: "Offer a 10% discount to this customer?", default: false, }); console.log("\nSubmitting your review..."); // Resume the workflow with the agent's input const resumeResult = await run.resume({ stepId: "reviewRecommendations", context: { approvedProducts, customerNote, offerDiscount, }, }); console.log("\n==================================="); console.log("Workflow completed!"); console.log("Email content:"); console.log("===================================\n"); console.log( resumeResult?.results?.sendRecommendations || "No email content generated", ); return resumeResult; } return result; } // Invoke the workflow with interactive terminal input runRecommendationWorkflow().catch(console.error);

複数のユーザー入力を伴う高度な例

この例では、コンテンツモデレーションシステムのように、複数回の人間による介入が必要となる、より複雑なワークフローを示します。

import { Mastra } from "@mastra/core"; import { Step, Workflow } from "@mastra/core/workflows"; import { z } from "zod"; import { select, input } from "@inquirer/prompts"; // Step 1: Receive and analyze content const analyzeContent = new Step({ id: "analyzeContent", outputSchema: z.object({ content: z.string(), aiAnalysisScore: z.number(), flaggedCategories: z.array(z.string()).optional(), }), execute: async ({ context }) => { const content = context.triggerData.content; // Simulate AI analysis const aiAnalysisScore = simulateContentAnalysis(content); const flaggedCategories = aiAnalysisScore < 0.7 ? ["potentially inappropriate", "needs review"] : []; return { content, aiAnalysisScore, flaggedCategories, }; }, });
// Step 2: Moderate content that needs review const moderateContent = new Step({ id: "moderateContent", // Define the schema for human input that will be provided when resuming inputSchema: z.object({ moderatorDecision: z.enum(["approve", "reject", "modify"]).optional(), moderatorNotes: z.string().optional(), modifiedContent: z.string().optional(), }), outputSchema: z.object({ moderationResult: z.enum(["approved", "rejected", "modified"]), moderatedContent: z.string(), notes: z.string().optional(), }), // @ts-ignore execute: async ({ context, suspend }) => { const analysisResult = context.getStepResult(analyzeContent); // Access the input provided when resuming the workflow const moderatorInput = { decision: context.inputData?.moderatorDecision, notes: context.inputData?.moderatorNotes, modifiedContent: context.inputData?.modifiedContent, }; // If the AI analysis score is high enough, auto-approve if ( analysisResult?.aiAnalysisScore > 0.9 && !analysisResult?.flaggedCategories?.length ) { return { moderationResult: "approved", moderatedContent: analysisResult.content, notes: "Auto-approved by system", }; } // If we don't have moderator input yet, suspend for human review if (!moderatorInput.decision) { await suspend({ content: analysisResult?.content, aiScore: analysisResult?.aiAnalysisScore, flaggedCategories: analysisResult?.flaggedCategories, message: "Please review this content and make a moderation decision", }); // Placeholder return return { moderationResult: "approved", moderatedContent: "", }; } // Process the moderator's decision switch (moderatorInput.decision) { case "approve": return { moderationResult: "approved", moderatedContent: analysisResult?.content || "", notes: moderatorInput.notes || "Approved by moderator", }; case "reject": return { moderationResult: "rejected", moderatedContent: "", notes: moderatorInput.notes || "Rejected by moderator", }; case "modify": return { moderationResult: "modified", moderatedContent: moderatorInput.modifiedContent || analysisResult?.content || "", notes: moderatorInput.notes || "Modified by moderator", }; default: return { moderationResult: "rejected", moderatedContent: "", notes: "Invalid moderator decision", }; } }, });
// Step 3: Apply moderation actions const applyModeration = new Step({ id: "applyModeration", outputSchema: z.object({ finalStatus: z.string(), content: z.string().optional(), auditLog: z.object({ originalContent: z.string(), moderationResult: z.string(), aiScore: z.number(), timestamp: z.string(), }), }), execute: async ({ context }) => { const analysisResult = context.getStepResult(analyzeContent); const moderationResult = context.getStepResult(moderateContent); // Create audit log const auditLog = { originalContent: analysisResult?.content || "", moderationResult: moderationResult?.moderationResult || "unknown", aiScore: analysisResult?.aiAnalysisScore || 0, timestamp: new Date().toISOString(), }; // Apply moderation action switch (moderationResult?.moderationResult) { case "approved": return { finalStatus: "Content published", content: moderationResult.moderatedContent, auditLog, }; case "modified": return { finalStatus: "Content modified and published", content: moderationResult.moderatedContent, auditLog, }; case "rejected": return { finalStatus: "Content rejected", auditLog, }; default: return { finalStatus: "Error in moderation process", auditLog, }; } }, });
// Build the workflow const contentModerationWorkflow = new Workflow({ name: "content-moderation-workflow", triggerSchema: z.object({ content: z.string(), }), }); contentModerationWorkflow .step(analyzeContent) .then(moderateContent) .then(applyModeration) .commit(); // Register the workflow const mastra = new Mastra({ workflows: { contentModerationWorkflow }, }); // Example of using the workflow with Inquirer prompts async function runModerationDemo() { const registeredWorkflow = mastra.getWorkflow("contentModerationWorkflow"); const run = registeredWorkflow.createRun(); // Start the workflow with content that needs review console.log("コンテンツモデレーションワークフローを開始します..."); const result = await run.start({ triggerData: { content: "これはモデレーションが必要なユーザー生成コンテンツです。", }, }); const isReviewStepSuspended = result.activePaths.get("moderateContent")?.status === "suspended"; // Check if workflow is suspended if (isReviewStepSuspended) { const { content, aiScore, flaggedCategories, message } = result.activePaths.get("moderateContent")?.suspendPayload; console.log("\n==================================="); console.log(message); console.log("===================================\n"); console.log("レビュー対象のコンテンツ:"); console.log(content); console.log(`\nAI分析スコア: ${aiScore}`); console.log( `フラグ付けされたカテゴリ: ${flaggedCategories?.join(", ") || "なし"}\n`, ); // Collect moderator decision using Inquirer const moderatorDecision = await select({ message: "モデレーションの判断を選択してください:", choices: [ { name: "このままコンテンツを承認する", value: "approve" }, { name: "コンテンツを完全に却下する", value: "reject" }, { name: "公開前にコンテンツを修正する", value: "modify" }, ], }); // Collect additional information based on decision let moderatorNotes = ""; let modifiedContent = ""; moderatorNotes = await input({ message: "判断に関するメモを入力してください:", }); if (moderatorDecision === "modify") { modifiedContent = await input({ message: "修正後のコンテンツを入力してください:", default: content, }); } console.log("\nモデレーションの判断を送信しています..."); // Resume the workflow with the moderator's input const resumeResult = await run.resume({ stepId: "moderateContent", context: { moderatorDecision, moderatorNotes, modifiedContent, }, }); if (resumeResult?.results?.applyModeration?.status === "success") { console.log("\n==================================="); console.log( `モデレーション完了: ${resumeResult?.results?.applyModeration?.output.finalStatus}`, ); console.log("===================================\n"); if (resumeResult?.results?.applyModeration?.output.content) { console.log("公開されたコンテンツ:"); console.log(resumeResult.results.applyModeration.output.content); } } return resumeResult; } console.log( "人による介入なしでワークフローが完了しました:", result.results, ); return result; } // Helper function for AI content analysis simulation function simulateContentAnalysis(content: string): number { // In a real application, this would call an AI service // For the example, we're returning a random score return Math.random(); } // Invoke the demo function runModerationDemo().catch(console.error);

主要な概念

  1. サスペンションポイント - ステップの execute 内で suspend() 関数を使用して、ワークフローの実行を一時停止します。

  2. サスペンションペイロード - サスペンド時に関連データを渡し、人間による意思決定のためのコンテキストを提供します:

await suspend({ messageForHuman: "Please review this data", data: someImportantData, });
  1. ワークフローステータスの確認 - ワークフロー開始後、返されたステータスを確認してサスペンドされているかどうかを判断します:
const result = await workflow.start({ triggerData }); if (result.status === "suspended" && result.suspendedStepId === "stepId") { // サスペンションの処理 console.log("Workflow is waiting for input:", result.suspendPayload); }
  1. 対話型ターミナル入力 - Inquirer などのライブラリを使って対話型プロンプトを作成します:
import { select, input, confirm } from "@inquirer/prompts"; // ワークフローがサスペンドされたとき if (result.status === "suspended") { // サスペンドペイロードから情報を表示 console.log(result.suspendPayload.message); // ユーザー入力を対話的に収集 const decision = await select({ message: "What would you like to do?", choices: [ { name: "Approve", value: "approve" }, { name: "Reject", value: "reject" }, ], }); // 収集した入力でワークフローを再開 await run.resume({ stepId: result.suspendedStepId, context: { decision }, }); }
  1. ワークフローの再開 - resume() メソッドを使って、人間の入力とともにワークフローの実行を続行します:
const resumeResult = await run.resume({ stepId: "suspendedStepId", context: { // このデータは context.inputData としてサスペンドされたステップに渡されます // また、ステップの inputSchema に準拠している必要があります userDecision: "approve", }, });
  1. 人間のデータ用インプットスキーマ - 人間の入力で再開される可能性のあるステップには、型安全性を確保するためにインプットスキーマを定義します:
const myStep = new Step({ id: "myStep", inputSchema: z.object({ // このスキーマは resume の context で渡されるデータを検証します // また、context.inputData として利用可能になります userDecision: z.enum(["approve", "reject"]), userComments: z.string().optional(), }), execute: async ({ context, suspend }) => { // 以前のサスペンションからユーザー入力があるか確認 if (context.inputData?.userDecision) { // ユーザーの決定を処理 return { result: `User decided: ${context.inputData.userDecision}` }; } // 入力がなければ、人間の判断を待つためにサスペンド await suspend(); }, });

Human-in-the-loop ワークフローは、自動化と人間の判断を組み合わせたシステムの構築に非常に有効です。例えば:

  • コンテンツモデレーションシステム
  • 承認ワークフロー
  • 監督付きAIシステム
  • エスカレーションを伴うカスタマーサービス自動化





GitHubで例を見る