ステップによる並列実行
AI アプリケーションを構築する際、効率を向上させるために複数の独立したタスクを同時に処理する必要がよくあります。
私たちはこの機能を .parallel
メソッドを通じてワークフローの中核部分としています。
セットアップ
npm install @ai-sdk/openai @mastra/core
プランニングエージェントの定義
場所と対応する天気状況を考慮して活動を計画するために、LLM呼び出しを活用するプランニングエージェントを定義します。
agents/planning-agent.ts
import { Agent } from "@mastra/core/agent";
import { openai } from "@ai-sdk/openai";
const llm = openai("gpt-4o");
// Define the planning agent with specific instructions for formatting
// and structuring weather-based activity recommendations
const planningAgent = new Agent({
name: "planningAgent",
model: llm,
instructions: `
You are a local activities and travel expert who excels at weather-based planning. Analyze the weather data and provide practical activity recommendations.
📅 [Day, Month Date, Year]
═══════════════════════════
🌡️ WEATHER SUMMARY
• Conditions: [brief description]
• Temperature: [X°C/Y°F to A°C/B°F]
• Precipitation: [X% chance]
🌅 MORNING ACTIVITIES
Outdoor:
• [Activity Name] - [Brief description including specific location/route]
Best timing: [specific time range]
Note: [relevant weather consideration]
🌞 AFTERNOON ACTIVITIES
Outdoor:
• [Activity Name] - [Brief description including specific location/route]
Best timing: [specific time range]
Note: [relevant weather consideration]
🏠 INDOOR ALTERNATIVES
• [Activity Name] - [Brief description including specific venue]
Ideal for: [weather condition that would trigger this alternative]
⚠️ SPECIAL CONSIDERATIONS
• [Any relevant weather warnings, UV index, wind conditions, etc.]
Guidelines:
- Suggest 2-3 time-specific outdoor activities per day
- Include 1-2 indoor backup options
- For precipitation >50%, lead with indoor activities
- All activities must be specific to the location
- Include specific venues, trails, or locations
- Consider activity intensity based on temperature
- Keep descriptions concise but informative
Maintain this exact formatting for consistency, using the emoji and section headers as shown.
`,
});
export { planningAgent };
統合エージェントの定義
計画された屋内および屋外活動を受け取り、一日の完全なレポートを提供する統合エージェントを定義します。
agents/synthesize-agent.ts
import { Agent } from "@mastra/core/agent";
import { openai } from "@ai-sdk/openai";
const llm = openai("gpt-4o");
// Define the synthesize agent that combines indoor and outdoor activity plans
// into a comprehensive report, considering weather conditions and alternatives
const synthesizeAgent = new Agent({
name: "synthesizeAgent",
model: llm,
instructions: `
You are given two different blocks of text, one about indoor activities and one about outdoor activities.
Make this into a full report about the day and the possibilities depending on whether it rains or not.
`,
});
export { synthesizeAgent };
並列ワークフローの定義
ここでは、プランニングステップと合成ステップの間の並列→逐次フローを調整するワークフローを定義します。
workflows/parallel-workflow.ts
import { z } from 'zod'
import { createStep, createWorkflow } from '@mastra/core/workflows/vNext'
const forecastSchema = z.object({
date: z.string(),
maxTemp: z.number(),
minTemp: z.number(),
precipitationChance: z.number(),
condition: z.string(),
location: z.string(),
});
// 指定された都市の天気データを取得するステップ
// 現在の天気状況と予報を取得するためのAPI呼び出しを行います
const fetchWeather = createStep({
id: "fetch-weather",
description: "指定された都市の天気予報を取得します",
inputSchema: z.object({
city: z.string(),
}),
outputSchema: forecastSchema,
execute: async ({ inputData }) => {
if (!inputData) {
throw new Error("トリガーデータが見つかりません");
}
const geocodingUrl = `https://geocoding-api.open-meteo.com/v1/search?name=${encodeURIComponent(inputData.city)}&count=1`
const geocodingResponse = await fetch(geocodingUrl)
const geocodingData = (await geocodingResponse.json()) as {
results: { latitude: number; longitude: number; name: string }[]
}
if (!geocodingData.results?.[0]) {
throw new Error(`場所 '${inputData.city}' が見つかりません`);
}
const { latitude, longitude, name } = geocodingData.results[0]
const weatherUrl = `https://api.open-meteo.com/v1/forecast?latitude=${latitude}&longitude=${longitude}¤t=precipitation,weathercode&timezone=auto,&hourly=precipitation_probability,temperature_2m`
const response = await fetch(weatherUrl)
const data = (await response.json()) as {
current: {
time: string;
precipitation: number;
weathercode: number;
};
hourly: {
precipitation_probability: number[]
temperature_2m: number[]
}
}
const forecast = {
date: new Date().toISOString(),
maxTemp: Math.max(...data.hourly.temperature_2m),
minTemp: Math.min(...data.hourly.temperature_2m),
condition: getWeatherCondition(data.current.weathercode),
location: name,
precipitationChance: data.hourly.precipitation_probability.reduce(
(acc, curr) => Math.max(acc, curr),
0,
),
}
return forecast
},
});
// 天気状況に基づいて屋外活動を計画するステップ
// プランニングエージェントを使用してアクティビティの推奨事項を生成します
const planActivities = createStep({
id: "plan-activities",
description: "天気状況に基づいてアクティビティを提案します",
inputSchema: forecastSchema,
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
const forecast = inputData
if (!forecast) {
throw new Error("予報データが見つかりません");
}
const prompt = `${forecast.location}の以下の天気予報に基づいて、適切なアクティビティを提案してください:
${JSON.stringify(forecast, null, 2)}
`
const agent = mastra?.getAgent('planningAgent')
if (!agent) {
throw new Error("プランニングエージェントが見つかりません");
}
const response = await agent.stream([
{
role: "user",
content: prompt,
},
])
let activitiesText = ''
for await (const chunk of response.textStream) {
process.stdout.write(chunk);
activitiesText += chunk;
}
return {
activities: activitiesText,
};
},
})
// 天気コードを人間が読める状態に変換するヘルパー関数
// 天気APIからの数値コードを説明的な文字列にマッピングします
function getWeatherCondition(code: number): string {
const conditions: Record<number, string> = {
0: "快晴",
1: "ほぼ晴れ",
2: "部分的に曇り",
3: "曇り",
45: "霧",
48: "着氷性の霧",
51: "軽い霧雨",
53: "中程度の霧雨",
55: "濃い霧雨",
61: "小雨",
63: "中程度の雨",
65: "大雨",
71: "小雪",
73: "中程度の雪",
75: "大雪",
95: "雷雨",
};
return conditions[code] || "不明";
}
// 悪天候に備えて室内アクティビティを計画するステップ
// 悪天候の場合の代替室内アクティビティを生成します
const planIndoorActivities = createStep({
id: "plan-indoor-activities",
description: "天候状況に基づいて室内アクティビティを提案します",
inputSchema: forecastSchema,
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
const forecast = inputData
if (!forecast) {
throw new Error("Forecast data not found");
}
const prompt = `雨が降った場合、${forecast.location}の${forecast.date}のための室内アクティビティを計画してください`
const agent = mastra?.getAgent('planningAgent')
if (!agent) {
throw new Error("Planning agent not found");
}
const response = await agent.stream([
{
role: "user",
content: prompt,
},
])
let activitiesText = ''
for await (const chunk of response.textStream) {
activitiesText += chunk;
}
return {
activities: activitiesText,
};
},
});
// 室内/屋外アクティビティ計画を統合するステップ
// 両方のオプションを考慮した包括的な計画を作成します
const synthesizeStep = createStep({
id: 'sythesize-step',
description: '室内と屋外のアクティビティの結果を統合します',
inputSchema: z.object({
"plan-activities": z.object({
activities: z.string(),
}),
"plan-indoor-activities": z.object({
activities: z.string(),
}),
}),
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
const indoorActivities = inputData?.['plan-indoor-activities']
const outdoorActivities = inputData?.['plan-activities']
const prompt = `室内アクティビティ:
${indoorActivities?.activities}
屋外アクティビティ:
${outdoorActivities?.activities}
雨が降る可能性があるので、必要に応じて室内アクティビティを行う準備をしておいてください。`
const agent = mastra?.getAgent('synthesizeAgent')
if (!agent) {
throw new Error('Synthesize agent not found')
}
const response = await agent.stream([
{
role: "user",
content: prompt,
},
])
let activitiesText = ''
for await (const chunk of response.textStream) {
process.stdout.write(chunk);
activitiesText += chunk;
}
return {
activities: activitiesText,
};
},
})
const activityPlanningWorkflow = createWorkflow({
id: 'plan-both-workflow',
inputSchema: z.object({
city: z.string(),
}),
outputSchema: z.object({
activities: z.string(),
}),
steps: [fetchWeather, planActivities, planIndoorActivities, synthesizeStep]
})
.then(fetchWeather)
.parallel([planActivities, planIndoorActivities])
.then(synthesizeStep)
.commit()
export { activityPlanningWorkflow }
Mastraクラスでエージェントとワークフローのインスタンスを登録する
エージェントとワークフローをmastraインスタンスに登録します。 これはワークフロー内でエージェントへのアクセスを可能にするために重要です。
index.ts
import { Mastra } from '@mastra/core/mastra'
import { createLogger } from '@mastra/core/logger'
import { activityPlanningWorkflow } from './workflows/parallel-workflow'
import { planningAgent } from './agents/planning-agent'
import { synthesizeAgent } from './agents/synthesize-agent'
// Initialize Mastra with required agents and workflows
// This setup enables agent access within the workflow steps
const mastra = new Mastra({
vnext_workflows: {
activityPlanningWorkflow,
},
agents: {
planningAgent,
synthesizeAgent,
},
logger: createLogger({
name: "Mastra",
level: "info",
}),
})
export { mastra }
アクティビティプランニングワークフローを実行する
ここでは、mastraインスタンスから天気ワークフローを取得し、実行を作成して、必要なinputDataで作成した実行を実行します。
exec.ts
import { mastra } from "./";
const workflow = mastra.vnext_getWorkflow('activityPlanningWorkflow')
const run = workflow.createRun()
// Execute the workflow with a specific city
// This will run through all steps and generate activity recommendations
const result = await run.start({ inputData: { city: 'Ibiza' } })
console.dir(result, { depth: null })