Skip to Content
ワークフロー並列ステップ

ステップによる並列実行

AIアプリケーションを構築する際、効率を高めるために複数の独立したタスクを同時に処理する必要がよくあります。 この機能をワークフローの中核として、.parallel メソッドを通じて提供しています。

セットアップ

npm install @ai-sdk/openai @mastra/core

プランニングエージェントの定義

場所とそれに対応する天候条件が与えられた場合に、LLMコールを活用してアクティビティを計画するプランニングエージェントを定義します。

agents/planning-agent.ts
import { Agent } from "@mastra/core/agent"; import { openai } from "@ai-sdk/openai"; const llm = openai("gpt-4o"); // Define the planning agent with specific instructions for formatting // and structuring weather-based activity recommendations const planningAgent = new Agent({ name: "planningAgent", model: llm, instructions: ` You are a local activities and travel expert who excels at weather-based planning. Analyze the weather data and provide practical activity recommendations. 📅 [Day, Month Date, Year] ═══════════════════════════ 🌡️ WEATHER SUMMARY • Conditions: [brief description] • Temperature: [X°C/Y°F to A°C/B°F] • Precipitation: [X% chance] 🌅 MORNING ACTIVITIES Outdoor: • [Activity Name] - [Brief description including specific location/route] Best timing: [specific time range] Note: [relevant weather consideration] 🌞 AFTERNOON ACTIVITIES Outdoor: • [Activity Name] - [Brief description including specific location/route] Best timing: [specific time range] Note: [relevant weather consideration] 🏠 INDOOR ALTERNATIVES • [Activity Name] - [Brief description including specific venue] Ideal for: [weather condition that would trigger this alternative] ⚠️ SPECIAL CONSIDERATIONS • [Any relevant weather warnings, UV index, wind conditions, etc.] Guidelines: - Suggest 2-3 time-specific outdoor activities per day - Include 1-2 indoor backup options - For precipitation >50%, lead with indoor activities - All activities must be specific to the location - Include specific venues, trails, or locations - Consider activity intensity based on temperature - Keep descriptions concise but informative Maintain this exact formatting for consistency, using the emoji and section headers as shown. `, }); export { planningAgent };

Synthesize Agent の定義

屋内および屋外のアクティビティ計画を受け取り、その日の全体的なレポートを提供する synthesize agent を定義します。

agents/synthesize-agent.ts
import { Agent } from "@mastra/core/agent"; import { openai } from "@ai-sdk/openai"; const llm = openai("gpt-4o"); // Define the synthesize agent that combines indoor and outdoor activity plans // into a comprehensive report, considering weather conditions and alternatives const synthesizeAgent = new Agent({ name: "synthesizeAgent", model: llm, instructions: ` You are given two different blocks of text, one about indoor activities and one about outdoor activities. Make this into a full report about the day and the possibilities depending on whether it rains or not. `, }); export { synthesizeAgent };

並列ワークフローの定義

ここでは、計画ステップと合成ステップの間で並列から順次へのフローをオーケストレーションするワークフローを定義します。

workflows/parallel-workflow.ts
import { z } from "zod"; import { createStep, createWorkflow } from "@mastra/core/workflows"; const forecastSchema = z.object({ date: z.string(), maxTemp: z.number(), minTemp: z.number(), precipitationChance: z.number(), condition: z.string(), location: z.string(), }); // Step to fetch weather data for a given city // Makes API calls to get current weather conditions and forecast const fetchWeather = createStep({ id: "fetch-weather", description: "Fetches weather forecast for a given city", inputSchema: z.object({ city: z.string(), }), outputSchema: forecastSchema, execute: async ({ inputData }) => { if (!inputData) { throw new Error("Trigger data not found"); } const geocodingUrl = `https://geocoding-api.open-meteo.com/v1/search?name=${encodeURIComponent(inputData.city)}&count=1`; const geocodingResponse = await fetch(geocodingUrl); const geocodingData = (await geocodingResponse.json()) as { results: { latitude: number; longitude: number; name: string }[]; }; if (!geocodingData.results?.[0]) { throw new Error(`Location '${inputData.city}' not found`); } const { latitude, longitude, name } = geocodingData.results[0]; const weatherUrl = `https://api.open-meteo.com/v1/forecast?latitude=${latitude}&longitude=${longitude}&current=precipitation,weathercode&timezone=auto,&hourly=precipitation_probability,temperature_2m`; const response = await fetch(weatherUrl); const data = (await response.json()) as { current: { time: string; precipitation: number; weathercode: number; }; hourly: { precipitation_probability: number[]; temperature_2m: number[]; }; }; const forecast = { date: new Date().toISOString(), maxTemp: Math.max(...data.hourly.temperature_2m), minTemp: Math.min(...data.hourly.temperature_2m), condition: getWeatherCondition(data.current.weathercode), location: name, precipitationChance: data.hourly.precipitation_probability.reduce( (acc, curr) => Math.max(acc, curr), 0, ), }; return forecast; }, });

天候条件に基づいて屋外アクティビティを計画するステップ

計画エージェントを使用してアクティビティの提案を生成します

workflows/parallel-workflow.ts
const planActivities = createStep({ id: "plan-activities", description: "Suggests activities based on weather conditions", inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, mastra }) => { const forecast = inputData; if (!forecast) { throw new Error("Forecast data not found"); } const prompt = `Based on the following weather forecast for ${forecast.location}, suggest appropriate activities: ${JSON.stringify(forecast, null, 2)} `; const agent = mastra?.getAgent("planningAgent"); if (!agent) { throw new Error("Planning agent not found"); } const response = await agent.stream([ { role: "user", content: prompt, }, ]); let activitiesText = ""; for await (const chunk of response.textStream) { process.stdout.write(chunk); activitiesText += chunk; } return { activities: activitiesText, }; }, });

天気コードを人間が読める条件に変換するヘルパー関数

天気APIからの数値コードを説明的な文字列にマッピングします

workflows/parallel-workflow.ts
function getWeatherCondition(code: number): string { const conditions: Record<number, string> = { 0: "Clear sky", 1: "Mainly clear", 2: "Partly cloudy", 3: "Overcast", 45: "Foggy", 48: "Depositing rime fog", 51: "Light drizzle", 53: "Moderate drizzle", 55: "Dense drizzle", 61: "Slight rain", 63: "Moderate rain", 65: "Heavy rain", 71: "Slight snow fall", 73: "Moderate snow fall", 75: "Heavy snow fall", 95: "Thunderstorm", }; return conditions[code] || "Unknown"; } // Step to plan indoor activities as backup options // Generates alternative indoor activities in case of bad weather const planIndoorActivities = createStep({ id: "plan-indoor-activities", description: "Suggests indoor activities based on weather conditions", inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, mastra }) => { const forecast = inputData; if (!forecast) { throw new Error("Forecast data not found"); } const prompt = `In case it rains, plan indoor activities for ${forecast.location} on ${forecast.date}`; const agent = mastra?.getAgent("planningAgent"); if (!agent) { throw new Error("Planning agent not found"); } const response = await agent.stream([ { role: "user", content: prompt, }, ]); let activitiesText = ""; for await (const chunk of response.textStream) { activitiesText += chunk; } return { activities: activitiesText, }; }, });

屋内・屋外アクティビティ計画を統合・合成するステップ

両方の選択肢を考慮した包括的な計画を作成します

workflows/parallel-workflow.ts
const synthesizeStep = createStep({ id: "sythesize-step", description: "Synthesizes the results of the indoor and outdoor activities", inputSchema: z.object({ "plan-activities": z.object({ activities: z.string(), }), "plan-indoor-activities": z.object({ activities: z.string(), }), }), outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, mastra }) => { const indoorActivities = inputData?.["plan-indoor-activities"]; const outdoorActivities = inputData?.["plan-activities"]; const prompt = `Indoor activities: ${indoorActivities?.activities} Outdoor activities: ${outdoorActivities?.activities} There is a chance of rain so be prepared to do indoor activities if needed.`; const agent = mastra?.getAgent("synthesizeAgent"); if (!agent) { throw new Error("Synthesize agent not found"); } const response = await agent.stream([ { role: "user", content: prompt, }, ]); let activitiesText = ""; for await (const chunk of response.textStream) { process.stdout.write(chunk); activitiesText += chunk; } return { activities: activitiesText, }; }, });

メインワークフロー

workflows/parallel-workflow.ts
const activityPlanningWorkflow = createWorkflow({ id: "plan-both-workflow", inputSchema: z.object({ city: z.string(), }), outputSchema: z.object({ activities: z.string(), }), steps: [fetchWeather, planActivities, planIndoorActivities, synthesizeStep], }) .then(fetchWeather) .parallel([planActivities, planIndoorActivities]) .then(synthesizeStep) .commit(); export { activityPlanningWorkflow };

MastraクラスでAgentとWorkflowインスタンスを登録する

エージェントとワークフローをmastraインスタンスに登録します。 これは、ワークフロー内でエージェントへのアクセスを可能にするために重要です。

index.ts
import { Mastra } from "@mastra/core/mastra"; import { createLogger } from "@mastra/core/logger"; import { activityPlanningWorkflow } from "./workflows/parallel-workflow"; import { planningAgent } from "./agents/planning-agent"; import { synthesizeAgent } from "./agents/synthesize-agent"; // Initialize Mastra with required agents and workflows // This setup enables agent access within the workflow steps const mastra = new Mastra({ workflows: { activityPlanningWorkflow, }, agents: { planningAgent, synthesizeAgent, }, logger: createLogger({ name: "Mastra", level: "info", }), }); export { mastra };

アクティビティ計画ワークフローを実行する

ここでは、mastra インスタンスから天気ワークフローを取得し、実行を作成して、必要な inputData を使ってその実行を開始します。

exec.ts
import { mastra } from "./"; const workflow = mastra.getWorkflow("activityPlanningWorkflow"); const run = workflow.createRun(); // Execute the workflow with a specific city // This will run through all steps and generate activity recommendations const result = await run.start({ inputData: { city: "Ibiza" } }); console.dir(result, { depth: null });