Inngest Workflow
この例では、Mastra を用いて Inngest のワークフローを構築する方法を紹介します。
セットアップ
npm install @mastra/inngest inngest @mastra/core @mastra/deployer @hono/node-server @ai-sdk/openai
docker run --rm -p 8288:8288 \
inngest/inngest \
inngest dev -u http://host.docker.internal:3000/inngest/api
または、公式の Inngest Dev Server ガイド に従って、ローカル開発で Inngest CLI を使用できます。
プランニングエージェントを定義する
場所とそれに対応する天候条件に基づいてアクティビティを計画するため、LLM 呼び出しを活用するプランニングエージェントを定義します。
import { Agent } from "@mastra/core/agent";
import { openai } from "@ai-sdk/openai";
// OpenAI モデルを使用する新しいプランニングエージェントを作成
const planningAgent = new Agent({
name: "planningAgent",
model: openai("gpt-4o"),
instructions: `
You are a local activities and travel expert who excels at weather-based planning. Analyze the weather data and provide practical activity recommendations.
📅 [Day, Month Date, Year]
═══════════════════════════
🌡️ WEATHER SUMMARY
• Conditions: [brief description]
• Temperature: [X°C/Y°F to A°C/B°F]
• Precipitation: [X% chance]
🌅 MORNING ACTIVITIES
Outdoor:
• [Activity Name] - [Brief description including specific location/route]
Best timing: [specific time range]
Note: [relevant weather consideration]
🌞 AFTERNOON ACTIVITIES
Outdoor:
• [Activity Name] - [Brief description including specific location/route]
Best timing: [specific time range]
Note: [relevant weather consideration]
🏠 INDOOR ALTERNATIVES
• [Activity Name] - [Brief description including specific venue]
Ideal for: [weather condition that would trigger this alternative]
⚠️ SPECIAL CONSIDERATIONS
• [Any relevant weather warnings, UV index, wind conditions, etc.]
Guidelines:
- Suggest 2-3 time-specific outdoor activities per day
- Include 1-2 indoor backup options
- For precipitation >50%, lead with indoor activities
- All activities must be specific to the location
- Include specific venues, trails, or locations
- Consider activity intensity based on temperature
- Keep descriptions concise but informative
Maintain this exact formatting for consistency, using the emoji and section headers as shown.
`,
});
export { planningAgent };
アクティビティプランナーのワークフローを定義する
ネットワーク経由で天気を取得するステップ、アクティビティを計画するステップ、屋内アクティビティのみを計画するステップの3つで、アクティビティプランナーのワークフローを定義します。
import { init } from "@mastra/inngest";
import { Inngest } from "inngest";
import { z } from "zod";
const { createWorkflow, createStep } = init(
new Inngest({
id: "mastra",
baseUrl: `http://localhost:8288`,
}),
);
// Helper function to convert weather codes to human-readable descriptions
function getWeatherCondition(code: number): string {
const conditions: Record<number, string> = {
0: "Clear sky",
1: "Mainly clear",
2: "Partly cloudy",
3: "Overcast",
45: "Foggy",
48: "Depositing rime fog",
51: "Light drizzle",
53: "Moderate drizzle",
55: "Dense drizzle",
61: "Slight rain",
63: "Moderate rain",
65: "Heavy rain",
71: "Slight snow fall",
73: "Moderate snow fall",
75: "Heavy snow fall",
95: "Thunderstorm",
};
return conditions[code] || "Unknown";
}
const forecastSchema = z.object({
date: z.string(),
maxTemp: z.number(),
minTemp: z.number(),
precipitationChance: z.number(),
condition: z.string(),
location: z.string(),
});
ステップ 1: 指定した都市の天気データを取得する
const fetchWeather = createStep({
id: "fetch-weather",
description: "Fetches weather forecast for a given city",
inputSchema: z.object({
city: z.string(),
}),
outputSchema: forecastSchema,
execute: async ({ inputData }) => {
if (!inputData) {
throw new Error("Trigger data not found");
}
// Get latitude and longitude for the city
const geocodingUrl = `https://geocoding-api.open-meteo.com/v1/search?name=${encodeURIComponent(inputData.city)}&count=1`;
const geocodingResponse = await fetch(geocodingUrl);
const geocodingData = (await geocodingResponse.json()) as {
results: { latitude: number; longitude: number; name: string }[];
};
if (!geocodingData.results?.[0]) {
throw new Error(`Location '${inputData.city}' not found`);
}
const { latitude, longitude, name } = geocodingData.results[0];
// Fetch weather data using the coordinates
const weatherUrl = `https://api.open-meteo.com/v1/forecast?latitude=${latitude}&longitude=${longitude}¤t=precipitation,weathercode&timezone=auto,&hourly=precipitation_probability,temperature_2m`;
const response = await fetch(weatherUrl);
const data = (await response.json()) as {
current: {
time: string;
precipitation: number;
weathercode: number;
};
hourly: {
precipitation_probability: number[];
temperature_2m: number[];
};
};
const forecast = {
date: new Date().toISOString(),
maxTemp: Math.max(...data.hourly.temperature_2m),
minTemp: Math.min(...data.hourly.temperature_2m),
condition: getWeatherCondition(data.current.weathercode),
location: name,
precipitationChance: data.hourly.precipitation_probability.reduce(
(acc, curr) => Math.max(acc, curr),
0,
),
};
return forecast;
},
});
ステップ 2: 天気に応じて(屋内または屋外の)アクティビティを提案する
const planActivities = createStep({
id: "plan-activities",
description: "天候に応じてアクティビティを提案します",
inputSchema: forecastSchema,
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
const forecast = inputData;
if (!forecast) {
throw new Error("予報データが見つかりません");
}
const prompt = `${forecast.location} の次の天気予報に基づいて、適切なアクティビティを提案してください:
${JSON.stringify(forecast, null, 2)}
`;
const agent = mastra?.getAgent("planningAgent");
if (!agent) {
throw new Error("プランニングエージェントが見つかりません");
}
const response = await agent.stream([
{
role: "user",
content: prompt,
},
]);
let activitiesText = "";
for await (const chunk of response.textStream) {
process.stdout.write(chunk);
activitiesText += chunk;
}
return {
activities: activitiesText,
};
},
});
ステップ 3: 屋内アクティビティのみを提案する(雨天時)
const planIndoorActivities = createStep({
id: "plan-indoor-activities",
description: "天候に応じて屋内アクティビティを提案します",
inputSchema: forecastSchema,
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
const forecast = inputData;
if (!forecast) {
throw new Error("予報データが見つかりません");
}
const prompt = `雨が予想される場合、${forecast.date} の ${forecast.location} 向けに屋内アクティビティを計画してください`;
const agent = mastra?.getAgent("planningAgent");
if (!agent) {
throw new Error("プランニングエージェントが見つかりません");
}
const response = await agent.stream([
{
role: "user",
content: prompt,
},
]);
let activitiesText = "";
for await (const chunk of response.textStream) {
process.stdout.write(chunk);
activitiesText += chunk;
}
return {
activities: activitiesText,
};
},
});
アクティビティプランナーのワークフローを定義する
const activityPlanningWorkflow = createWorkflow({
id: "activity-planning-workflow-step2-if-else",
inputSchema: z.object({
city: z.string().describe("天気を取得する対象の都市"),
}),
outputSchema: z.object({
activities: z.string(),
}),
})
.then(fetchWeather)
.branch([
[
// 降水確率が50%を超える場合は屋内アクティビティを提案する
async ({ inputData }) => {
return inputData?.precipitationChance > 50;
},
planIndoorActivities,
],
[
// それ以外の場合は屋内外を組み合わせた提案を行う
async ({ inputData }) => {
return inputData?.precipitationChance <= 50;
},
planActivities,
],
]);
activityPlanningWorkflow.commit();
export { activityPlanningWorkflow };
Mastra クラスに Agent と Workflow のインスタンスを登録する
mastra
インスタンスに Agent と Workflow を登録します。これにより、Workflow 内から Agent にアクセスできるようになります。
import { Mastra } from "@mastra/core/mastra";
import { serve as inngestServe } from "@mastra/inngest";
import { PinoLogger } from "@mastra/loggers";
import { Inngest } from "inngest";
import { activityPlanningWorkflow } from "./workflows/inngest-workflow";
import { planningAgent } from "./agents/planning-agent";
import { realtimeMiddleware } from "@inngest/realtime";
// ワークフローのオーケストレーションとイベント処理のための Inngest インスタンスを作成
const inngest = new Inngest({
id: "mastra",
baseUrl: `http://localhost:8288`, // ローカル Inngest サーバーの URL
isDev: true,
middleware: [realtimeMiddleware()], // Inngest ダッシュボードでリアルタイム更新を有効化
});
// メインの Mastra インスタンスを作成・構成
export const mastra = new Mastra({
workflows: {
activityPlanningWorkflow,
},
agents: {
planningAgent,
},
server: {
host: "0.0.0.0",
apiRoutes: [
{
path: "/api/inngest", // Inngest がイベントを送信するための API エンドポイント
method: "ALL",
createHandler: async ({ mastra }) => inngestServe({ mastra, inngest }),
},
],
},
logger: new PinoLogger({
name: "Mastra",
level: "info",
}),
});
アクティビティプランナーのワークフローを実行する
ここでは、mastra インスタンスからアクティビティプランナーのワークフローを取得し、run
を作成して、必要な inputData
とともにその run
を実行します。
import { mastra } from "./";
import { serve } from "@hono/node-server";
import {
createHonoServer,
getToolExports,
} from "@mastra/deployer/server";
import { tools } from "#tools";
const app = await createHonoServer(mastra, {
tools: getToolExports(tools),
});
// Inngest がイベントを送信できるよう、サーバーをポート 3000 で起動する
const srv = serve({
fetch: app.fetch,
port: 3000,
});
const workflow = mastra.getWorkflow("activityPlanningWorkflow");
const run = await workflow.createRunAsync();
// 必要な入力データ(都市名)でワークフローを開始する
// これによりワークフローの各ステップがトリガーされ、結果がコンソールにストリーミングされる
const result = await run.start({ inputData: { city: "New York" } });
console.dir(result, { depth: null });
// ワークフローの実行完了後にサーバーを閉じる
srv.close();
ワークフローを実行した後は、Inngest ダッシュボード http://localhost:8288 で、ワークフロー実行をリアルタイムに表示・監視できます。
Inngest フロー制御の設定
Inngest のワークフローは、同時実行数の制限、レート制限、スロットリング、デバウンス、優先度キューイングなどの高度なフロー制御機能をサポートしています。これらの機能は、大規模なワークフロー実行の管理やリソースの過負荷防止に役立ちます。
同時実行制御
同時に実行できるワークフローインスタンス数を制御します:
const workflow = createWorkflow({
id: 'user-processing-workflow',
inputSchema: z.object({ userId: z.string() }),
outputSchema: z.object({ result: z.string() }),
steps: [processUserStep],
// ユーザー ID ごとに同時実行を 10 に制限
concurrency: {
limit: 10,
key: 'event.data.userId' // ユーザー単位の同時実行制御
},
});
レート制限
一定期間内のワークフロー実行回数を制限します:
const workflow = createWorkflow({
id: 'api-sync-workflow',
inputSchema: z.object({ endpoint: z.string() }),
outputSchema: z.object({ status: z.string() }),
steps: [apiSyncStep],
// 1 時間あたり最大 1000 回実行
rateLimit: {
period: '1h',
limit: 1000
},
});
スロットリング
ワークフロー実行間の最小間隔を確保します:
const workflow = createWorkflow({
id: 'email-notification-workflow',
inputSchema: z.object({ organizationId: z.string(), message: z.string() }),
outputSchema: z.object({ sent: z.boolean() }),
steps: [sendEmailStep],
// 組織ごとに 10 秒あたり 1 回のみ実行
throttle: {
period: '10s',
limit: 1,
key: 'event.data.organizationId'
},
});
デバウンス
一定時間新しいイベントが来ない場合にのみ実行します:
const workflow = createWorkflow({
id: 'search-index-workflow',
inputSchema: z.object({ documentId: z.string() }),
outputSchema: z.object({ indexed: z.boolean() }),
steps: [indexDocumentStep],
// 更新が途絶えてから 5 秒待ってインデックス作成
debounce: {
period: '5s',
key: 'event.data.documentId'
},
});
優先度キューイング
ワークフローの実行優先度を設定します:
const workflow = createWorkflow({
id: 'order-processing-workflow',
inputSchema: z.object({ orderId: z.string(), priority: z.number().optional() }),
outputSchema: z.object({ processed: z.boolean() }),
steps: [processOrderStep],
// 優先度の高い注文から先に実行
priority: {
run: 'event.data.priority ?? 50' // 動的な優先度。デフォルトは 50
},
});
複合フロー制御
複数のフロー制御機能を組み合わせられます:
const workflow = createWorkflow({
id: 'comprehensive-workflow',
inputSchema: z.object({
userId: z.string(),
organizationId: z.string(),
priority: z.number().optional()
}),
outputSchema: z.object({ result: z.string() }),
steps: [comprehensiveStep],
// 複数のフロー制御機能
concurrency: {
limit: 5,
key: 'event.data.userId'
},
rateLimit: {
period: '1m',
limit: 100
},
throttle: {
period: '10s',
limit: 1,
key: 'event.data.organizationId'
},
priority: {
run: 'event.data.priority ?? 0'
}
});
すべてのフロー制御機能は任意です。指定がない場合、ワークフローは Inngest のデフォルト動作で実行されます。フロー制御の設定は Inngest のネイティブ実装によって検証され、互換性と正確性が保証されます。
フロー制御オプションとその動作の詳細については、Inngest Flow Control のドキュメント を参照してください。
ワークフロー(レガシー)
以下のリンクは、レガシー版ワークフローのサンプルドキュメントです。