Dagster + NATS JetStream イベントパイプラインの実装詳解
agent-gateway から fire-and-forget で publish されたイベントを Dagster sensor が pull subscribe し、asset materialization するまでの実装詳解。GraphQL trigger bridge から native sensor への進化、テーブル駆動ルーティング、PostgreSQL スキーマ設計、冪等性保証を記録。Chat Pair データセット、Knowledge Event アセット、Go 側ブリッジ汎用化の実装詳細を含む。
結論
Go + NATS + Dagster AI オーケストレーション基盤の中で、Dagster + NATS JetStream によるイベントパイプラインは gateway と完全に分離された消費側の実装である。この記事ではその消費側の実装詳細を掘り下げる。
設計の核心は「gateway は publish only、Dagster sensor が独立に pull subscribe する」という責務分離にある。初期は NATS Consumer Worker + GraphQL API trigger bridge で実装したが、最終的に Dagster native sensor に一本化した。テーブル駆動のトピック → ジョブマッピング、PostgreSQL の冪等スキーマ、correlation_id による全経路トレーシングがこのパイプラインの骨格になっている。
Knowledge モジュールの主要操作——Chat、Embedding、Retrieve、Compose——はすべて NATS → Dagster のパイプラインとして計装されている。各操作は Dagster Software-Defined Asset のペア(record → dataset)として定義され、Dagit UI 上でリネージグラフとして可視化される。Go 側のブリッジも pipelineRoute 構造体によるテーブル駆動設計で統一されている。
前提
この記事は以下の基盤上で動作するパイプラインの詳解である。全体アーキテクチャはGo 基盤の設計全記録を参照。
- メッセージング: NATS 2.11 (JetStream 有効)
- オーケストレーション: Dagster (Python) — daemon + sensor + asset materialization
- データストア: PostgreSQL 18 + pgvector (JIT 有効)
- イベント発行元: agent-gateway (Go/Gin) — fire-and-forget publish
なぜイベント駆動にしたか
初期段階では Knowledge Service がチャットレスポンス直後に PostgreSQL へ同期書き込みしていた。以下の問題が顕在化した。
| 問題 | 影響 |
|---|---|
| レスポンス遅延 | 後処理がユーザー向けレスポンス時間に含まれる |
| スケーラビリティ | 複数の後処理ステップが逐次実行される |
| 可観測性 | どの処理がいつ失敗したか把握困難 |
| 再現性 | RAG ロジックの検証・改善が困難 |
解決策として、gateway はイベントを publish するだけにし、消費側(Dagster)が独立に処理する構成に切り替えた。これによりレスポンス遅延はゼロになり、後処理の追加・変更が gateway に影響しなくなった。
Consumer アーキテクチャの進化
Phase 1: NATS Consumer Worker + GraphQL bridge
初期実装では、独立した Python プロセスが NATS を subscribe し、Dagster GraphQL API を呼び出してジョブを起動していた。
agent-gateway → NATS pub → Consumer Worker → GraphQL mutation → Dagster job
async def message_handler(msg):
payload = json.loads(msg.data.decode())
submit_dagster_run(
job_name="knowledge_chat_persist",
payload=payload,
correlation_id=payload.get("correlation_id", "")
)
GraphQL mutation:
mutation LaunchRun($executionParams: LaunchRunExecutionParam!) {
launchRun(executionParams: $executionParams) {
__typename
... on LaunchRunSuccess {
run { runId }
}
... on RunConfigValidationInvalid {
errors { message reason }
}
}
}
この構成の問題点:
- Consumer Worker が独立プロセスとして管理コストを増やす
- GraphQL API が中間の障害点になる
- Dagster の lineage tracking と consumer の状態が分離している
Phase 2: マルチジョブ fan-out + テーブル駆動
トピック数が増加し、条件分岐ロジックが膨らんだため、テーブル駆動設計に移行した。
TOPIC_JOB_MAP = {
"pipeline.knowledge.chat.persist": [
("knowledge_chat_persist", build_audit_run_config),
("knowledge_chat_pair_materialize", build_asset_run_config),
],
"pipeline.knowledge.embedding": [
("knowledge_embedding", build_event_run_config),
],
"pipeline.knowledge.retrieve": [
("knowledge_retrieve", build_event_run_config),
],
"pipeline.knowledge.compose": [
("knowledge_compose", build_event_run_config),
],
}
- トピック追加はテーブルへのエントリ追加のみ — コード複雑度は O(1)
- 各ジョブの設定ビルダ関数を独立管理
- fan-out(1 メッセージ → 複数ジョブ)を宣言的に表現
Phase 3: Dagster native sensor(現行)
GraphQL trigger bridge を排除し、Dagster の native sensor 機構に一本化した。
agent-gateway → NATS JetStream pub (fire-and-forget)
↓
Dagster daemon → sensor (pull subscribe, durable consumer)
↓
RunRequest → job 直接実行 → asset materialization
sensor が JetStream から直接 pull subscribe し、RunRequest を発行してジョブを実行する。Consumer Worker と GraphQL API が不要になり、コンポーネント数が減った。
現在稼働中の sensor 一覧:
| sensor | トピック | ジョブ |
|---|---|---|
| nats_dagster_chat_persist | pipeline.knowledge.chat.persist | knowledge_chat_persist + knowledge_chat_pair_materialize |
| nats_dagster_embedding | pipeline.knowledge.embedding | knowledge_embedding |
| nats_dagster_retrieve | pipeline.knowledge.retrieve | knowledge_retrieve |
| nats_dagster_compose | pipeline.knowledge.compose | knowledge_compose |
| nats_dagster_flow_lineage | pipeline.knowledge.flow.lineage | knowledge_flow_lineage |
| nats_dagster_tool_call | pipeline.knowledge.tool_call | knowledge_tool_call |
テーブル駆動のトピック → ジョブマッピングの考え方は sensor 設計にそのまま引き継がれている。
NATS メッセージスキーマ
チャット永続化メッセージ
{
"pipeline_id": "knowledge.chat.persist",
"correlation_id": "unique-trace-id",
"idempotency_key": "idempotency-unique-id",
"model": "llm-model-name",
"prompt": "user-provided-prompt-text",
"response": "llm-generated-response-text",
"messages": [{"role": "user", "content": "..."}],
"usage": {"prompt_tokens": 100, "completion_tokens": 50},
"requested_at": 1697845200000,
"payload_json": "additional structured data"
}
設計上のポイント
| フィールド | 目的 |
|---|---|
correlation_id | エンドツーエンドのトレーシング。gateway → NATS → Dagster → PostgreSQL → 分析クエリまで伝播 |
idempotency_key | 同一メッセージの重複処理を防止。PostgreSQL 側の UNIQUE 制約と組み合わせ |
payload_json | 拡張用。追加メタデータを JSONB で保存 |
JetStream ストリーム設定
| ストリーム | subjects | retention | max-age | storage |
|---|---|---|---|---|
| PIPELINE | pipeline.> | limits | 72h | file |
| TELEMETRY | telemetry.> | limits | 24h | file |
PIPELINE ストリームは Dagster sensor が消費する。TELEMETRY ストリームは Vector (Rust) が subscribe して Prometheus / Loki に流す。
PostgreSQL スキーマ設計
knowledge_chat_audit テーブル
チャット永続化の監査ログ。
CREATE TABLE knowledge_chat_audit (
id BIGSERIAL PRIMARY KEY,
pipeline_id TEXT NOT NULL,
correlation_id TEXT,
idempotency_key TEXT,
model TEXT,
prompt TEXT,
response TEXT,
payload_json JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
knowledge_chat_pairs テーブル
プロンプトとレスポンスをペアにした学習用データセット。knowledge_chat_audit が監査ログとして個々のリクエスト単位で記録するのに対し、このテーブルはプロンプトとレスポンスを対にしたペアデータとして永続化する。
CREATE TABLE knowledge_chat_pairs (
id BIGSERIAL PRIMARY KEY,
pair_id TEXT UNIQUE NOT NULL,
correlation_id TEXT,
pipeline_id TEXT,
llm_model TEXT,
prompt TEXT,
response TEXT,
prompt_tokens INT,
completion_tokens INT,
total_tokens INT,
payload_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
設計判断:
- pair_id の UNIQUE 制約 — deterministic な ID 生成により、同一ペアの重複投入を
ON CONFLICT DO NOTHINGで防ぐ - トークン数カラム — fine-tuning 用データセットとしての利用を想定し、トークン統計をペイロードから抽出して正規化カラムに保持
- audit テーブルとの分離 — 監査ログとペアデータセットは用途が異なるため、同一 NATS メッセージから2ジョブをファンアウトして並列に永続化する
knowledge_events テーブル(統一イベント)
embedding、retrieve、compose など複数の処理ステップが共有する統一イベントテーブル。
CREATE TABLE knowledge_events (
id BIGSERIAL PRIMARY KEY,
event_id TEXT UNIQUE, -- SHA256(event_type:correlation_id)[:32]
event_type TEXT NOT NULL, -- 'embedding' | 'retrieve' | 'compose'
correlation_id TEXT NOT NULL,
pipeline_id TEXT NOT NULL,
llm_model TEXT,
payload_json JSONB DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_knowledge_events_type ON knowledge_events (event_type);
CREATE INDEX idx_knowledge_events_corr ON knowledge_events (correlation_id);
CREATE INDEX idx_knowledge_events_date ON knowledge_events (created_at::date);
設計判断:
- event_id の UNIQUE 制約 — 冪等性の最終防衛線。同一イベントが複数回処理されても重複挿入されない
- event_type 判別カラム — テーブルを分散させず、1 テーブルに統合することで分析クエリを簡素化
- インデックス — event_type と correlation_id で高速フィルタリング、created_at::date で日次集計
冪等性とエラーハンドリング
冪等設計
| 層 | 手段 |
|---|---|
| NATS | JetStream の durable consumer。ACK されるまでメッセージを保持 |
| Dagster | sensor が RunRequest 発行前にメッセージ内容を検証 |
| PostgreSQL | idempotency_key / event_id の UNIQUE 制約で重複排除 |
すべての副作用は UPSERT (ON CONFLICT) か UNIQUE 制約で重複を吸収する。
エラーハンドリング
- NATS メッセージの ACK は Dagster ジョブ起動成功後に行う(Phase 1/2 では GraphQL 呼び出し成功後)
- 失敗したメッセージは JetStream の再配送キューに保持される
- Dagster 側のジョブ失敗は Dagit UI と slog ログの両方で追跡可能
- correlation_id がすべてのログに含まれるため、gateway → NATS → Dagster → PostgreSQL の全経路を 1 クエリで追跡できる
順序保証
embedding → retrieve → compose の順序は NATS レベルでは保証しない。Dagster Asset 依存関係で順序を制御し、correlation_id で事後的に関連イベントを結合・検証する。
Dagster Software-Defined Asset の設計
Knowledge モジュールの各操作は、Dagster の Software-Defined Asset をレコード(抽出)→ データセット(永続化)のペアとして定義している。
Chat Pair アセット
chat_pair_record と chat_pair_dataset の2つのアセットで構成される。
chat_pair_record: NATS ペイロードからの純粋な抽出。deterministic な pair_id を生成し、トークン数やモデルメタデータを付与する。入力の変換のみで副作用なしchat_pair_dataset:chat_pair_recordに依存する永続化レイヤー。knowledge_chat_pairsテーブルに INSERT し、ON CONFLICT DO NOTHINGで冪等性を保証
アセット依存関係がそのまま Dagit UI のリネージグラフになる。
Knowledge Event アセット
embedding、retrieve、compose の各操作も同じレコード → データセットパターンで定義している。
| アセットペア | 対象操作 | NATS トピック |
|---|---|---|
embedding_event_record → embedding_event_dataset | ベクトル生成(nomic-embed-text-v1.5) | pipeline.knowledge.embedding |
retrieve_event_record → retrieve_event_dataset | ベクトル検索 + reranking(ColBERT via FastAPI) | pipeline.knowledge.retrieve |
compose_event_record → compose_event_dataset | 複数レスポンスからの選択 | pipeline.knowledge.compose |
各レコードアセットは NATS ペイロードから event_id、event_type、correlation_id、pipeline_id、llm_model を抽出する。各データセットアセットはレコードに依存し、knowledge_events テーブルに永続化する。イベント種別ごとにテーブルを分けずに event_type カラムで区別する単一テーブル設計を選択した理由は、スキーマの増殖を防ぎつつペイロードの構造差を JSONB カラムに吸収できるためである。
ファンアウトパターン
NATS の pipeline.knowledge.chat.persist トピックに到着した1つのメッセージは、2つの Dagster ジョブを並列にトリガーする:
knowledge_chat_persist→knowledge_chat_auditテーブル(監査ログ)knowledge_chat_pair_materialize→chat_pair_record→chat_pair_dataset→knowledge_chat_pairsテーブル(ペアデータセット)
embedding、retrieve、compose はそれぞれ1トピック1ジョブの構成だが、トピック追加時にファンアウトが必要であればテーブル駆動マップにエントリを追加するだけで対応できる。
Go 側パイプラインブリッジ
Go 側(cmd/server/main.go)のブリッジもテーブル駆動パターンで統一した。
type pipelineRoute struct {
sourceTopic string
targetJob string
}
startPipelineToDagsterBridge() は複数の pipelineRoute を受け取り、startBridgeForRoute() が個別の goroutine として各 route の NATS subscribe → Dagster trigger を担当する。
新しい Knowledge 操作を追加する際のステップ:
- Go 側で NATS トピックに publish するコードを追加
- Python 側でアセットとジョブを定義
- Go 側の route テーブルと Python 側の
TOPIC_JOB_MAP(または sensor 定義)にエントリを追加
newPipelineService() は NATS 接続失敗時に mock repository にフォールバックする設計を維持している。開発中にすべてのインフラが起動していなくても gateway 単体で動作できる。
Knowledge モジュールの計装状況
| 操作 | 状態 | NATS トピック | 備考 |
|---|---|---|---|
Chat() | 計装済み | pipeline.knowledge.chat.persist | 監査ログ + ペアデータセット |
Embedding() | 計装済み | pipeline.knowledge.embedding | nomic-embed-text-v1.5 |
Retrieve() | 計装済み | pipeline.knowledge.retrieve | pgvector 実装は未完、ColBERT reranking あり |
Compose() | 計装済み | pipeline.knowledge.compose | 最小実装(最初の candidate を返す) |
Produce() | 未実装 | — | placeholder |
Plan() | 未実装 | — | placeholder |
Execute() | 未実装 | — | placeholder |
注意事項
- Phase 1/2 の GraphQL trigger bridge コードは参考実装として残っているが、現行の sensor 方式とは併用しない
- JetStream の max-age (72h) を超えた未消費メッセージは失われる。Dagster daemon が 72 時間以上停止した場合は手動リカバリが必要
knowledge_events.event_idの生成ロジック(SHA256(event_type:correlation_id)[:32])は、同一 correlation_id で同一 event_type のイベントが複数回発生するケースに注意が必要- PostgreSQL の init スクリプト(
020-create-chat-pairs.sql、030-create-knowledge-events.sql)は初回起動時のみ実行される。既存ボリュームでは手動で DDL を流す必要がある - Produce、Plan、Execute は未実装のまま。将来フェーズで計装対象に追加予定
検証
- Dagster UI で asset materialization 状態を確認: chat_pair, flow_lineage, tool_call が正常に Materialized
- NATS モニタリング (
:8222) で stream 状態と consumer lag を監視 - PostgreSQL で
SELECT * FROM knowledge_events WHERE correlation_id = '...'による全経路トレーシングを確認 - 冪等性テスト: 同一メッセージを複数回 publish し、PostgreSQL に重複レコードが生成されないことを検証
関連ファイル
この計装で変更・追加されたファイル:
| ファイル | 内容 |
|---|---|
devstack/dagster/project/assets/chat_pairs.py | chat_pair_record, chat_pair_dataset アセット |
devstack/dagster/project/assets/knowledge_events.py | embedding/retrieve/compose イベントアセット |
devstack/dagster/project/jobs/chat_pair_materialize.py | chat pair materialization ジョブ |
devstack/dagster/project/jobs/knowledge_events.py | embedding/retrieve/compose ジョブ |
devstack/dagster/project/defs.py | アセット・ジョブ登録 |
devstack/dagster/worker/nats_consumer.py | テーブル駆動ルーティング(Phase 2) |
devstack/postgres/docker-entrypoint-initdb.d/020-create-chat-pairs.sql | knowledge_chat_pairs DDL |
devstack/postgres/docker-entrypoint-initdb.d/030-create-knowledge-events.sql | knowledge_events DDL |
cmd/server/main.go | pipelineRoute 構造体、ブリッジ汎用化 |
次のアクション
現在 Phase 1(リアルタイム基盤)が稼働中。今後の拡張:
- Dagster Asset Materialization を活用したチャットペアの自動キュレーション
- pg → Parquet → Iceberg バッチ変換(Phase 2、devstack 済み)
- fine-tuning 用データセットの自動生成パイプライン
全体ロードマップはGo 基盤の設計全記録を参照。
