結論

Go + NATS + Dagster AI オーケストレーション基盤の中で、Dagster + NATS JetStream によるイベントパイプラインは gateway と完全に分離された消費側の実装である。この記事ではその消費側の実装詳細を掘り下げる。

設計の核心は「gateway は publish only、Dagster sensor が独立に pull subscribe する」という責務分離にある。初期は NATS Consumer Worker + GraphQL API trigger bridge で実装したが、最終的に Dagster native sensor に一本化した。テーブル駆動のトピック → ジョブマッピング、PostgreSQL の冪等スキーマ、correlation_id による全経路トレーシングがこのパイプラインの骨格になっている。

Knowledge モジュールの主要操作——Chat、Embedding、Retrieve、Compose——はすべて NATS → Dagster のパイプラインとして計装されている。各操作は Dagster Software-Defined Asset のペア(record → dataset)として定義され、Dagit UI 上でリネージグラフとして可視化される。Go 側のブリッジも pipelineRoute 構造体によるテーブル駆動設計で統一されている。


前提

この記事は以下の基盤上で動作するパイプラインの詳解である。全体アーキテクチャはGo 基盤の設計全記録を参照。

  • メッセージング: NATS 2.11 (JetStream 有効)
  • オーケストレーション: Dagster (Python) — daemon + sensor + asset materialization
  • データストア: PostgreSQL 18 + pgvector (JIT 有効)
  • イベント発行元: agent-gateway (Go/Gin) — fire-and-forget publish

なぜイベント駆動にしたか

初期段階では Knowledge Service がチャットレスポンス直後に PostgreSQL へ同期書き込みしていた。以下の問題が顕在化した。

問題影響
レスポンス遅延後処理がユーザー向けレスポンス時間に含まれる
スケーラビリティ複数の後処理ステップが逐次実行される
可観測性どの処理がいつ失敗したか把握困難
再現性RAG ロジックの検証・改善が困難

解決策として、gateway はイベントを publish するだけにし、消費側(Dagster)が独立に処理する構成に切り替えた。これによりレスポンス遅延はゼロになり、後処理の追加・変更が gateway に影響しなくなった。


Consumer アーキテクチャの進化

Phase 1: NATS Consumer Worker + GraphQL bridge

初期実装では、独立した Python プロセスが NATS を subscribe し、Dagster GraphQL API を呼び出してジョブを起動していた。

  agent-gateway → NATS pub → Consumer Worker → GraphQL mutation → Dagster job
  
  async def message_handler(msg):
    payload = json.loads(msg.data.decode())
    submit_dagster_run(
        job_name="knowledge_chat_persist",
        payload=payload,
        correlation_id=payload.get("correlation_id", "")
    )
  

GraphQL mutation:

  mutation LaunchRun($executionParams: LaunchRunExecutionParam!) {
  launchRun(executionParams: $executionParams) {
    __typename
    ... on LaunchRunSuccess {
      run { runId }
    }
    ... on RunConfigValidationInvalid {
      errors { message reason }
    }
  }
}
  

この構成の問題点:

  • Consumer Worker が独立プロセスとして管理コストを増やす
  • GraphQL API が中間の障害点になる
  • Dagster の lineage tracking と consumer の状態が分離している

Phase 2: マルチジョブ fan-out + テーブル駆動

トピック数が増加し、条件分岐ロジックが膨らんだため、テーブル駆動設計に移行した。

  TOPIC_JOB_MAP = {
    "pipeline.knowledge.chat.persist": [
        ("knowledge_chat_persist", build_audit_run_config),
        ("knowledge_chat_pair_materialize", build_asset_run_config),
    ],
    "pipeline.knowledge.embedding": [
        ("knowledge_embedding", build_event_run_config),
    ],
    "pipeline.knowledge.retrieve": [
        ("knowledge_retrieve", build_event_run_config),
    ],
    "pipeline.knowledge.compose": [
        ("knowledge_compose", build_event_run_config),
    ],
}
  
  • トピック追加はテーブルへのエントリ追加のみ — コード複雑度は O(1)
  • 各ジョブの設定ビルダ関数を独立管理
  • fan-out(1 メッセージ → 複数ジョブ)を宣言的に表現

Phase 3: Dagster native sensor(現行)

GraphQL trigger bridge を排除し、Dagster の native sensor 機構に一本化した。

  agent-gateway → NATS JetStream pub (fire-and-forget)
                    ↓
Dagster daemon → sensor (pull subscribe, durable consumer)
                    ↓
                RunRequest → job 直接実行 → asset materialization
  

sensor が JetStream から直接 pull subscribe し、RunRequest を発行してジョブを実行する。Consumer Worker と GraphQL API が不要になり、コンポーネント数が減った。

現在稼働中の sensor 一覧:

sensorトピックジョブ
nats_dagster_chat_persistpipeline.knowledge.chat.persistknowledge_chat_persist + knowledge_chat_pair_materialize
nats_dagster_embeddingpipeline.knowledge.embeddingknowledge_embedding
nats_dagster_retrievepipeline.knowledge.retrieveknowledge_retrieve
nats_dagster_composepipeline.knowledge.composeknowledge_compose
nats_dagster_flow_lineagepipeline.knowledge.flow.lineageknowledge_flow_lineage
nats_dagster_tool_callpipeline.knowledge.tool_callknowledge_tool_call

テーブル駆動のトピック → ジョブマッピングの考え方は sensor 設計にそのまま引き継がれている。


NATS メッセージスキーマ

チャット永続化メッセージ

  {
  "pipeline_id": "knowledge.chat.persist",
  "correlation_id": "unique-trace-id",
  "idempotency_key": "idempotency-unique-id",
  "model": "llm-model-name",
  "prompt": "user-provided-prompt-text",
  "response": "llm-generated-response-text",
  "messages": [{"role": "user", "content": "..."}],
  "usage": {"prompt_tokens": 100, "completion_tokens": 50},
  "requested_at": 1697845200000,
  "payload_json": "additional structured data"
}
  

設計上のポイント

フィールド目的
correlation_idエンドツーエンドのトレーシング。gateway → NATS → Dagster → PostgreSQL → 分析クエリまで伝播
idempotency_key同一メッセージの重複処理を防止。PostgreSQL 側の UNIQUE 制約と組み合わせ
payload_json拡張用。追加メタデータを JSONB で保存

JetStream ストリーム設定

ストリームsubjectsretentionmax-agestorage
PIPELINEpipeline.>limits72hfile
TELEMETRYtelemetry.>limits24hfile

PIPELINE ストリームは Dagster sensor が消費する。TELEMETRY ストリームは Vector (Rust) が subscribe して Prometheus / Loki に流す。


PostgreSQL スキーマ設計

knowledge_chat_audit テーブル

チャット永続化の監査ログ。

  CREATE TABLE knowledge_chat_audit (
  id BIGSERIAL PRIMARY KEY,
  pipeline_id TEXT NOT NULL,
  correlation_id TEXT,
  idempotency_key TEXT,
  model TEXT,
  prompt TEXT,
  response TEXT,
  payload_json JSONB NOT NULL DEFAULT '{}',
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
  

knowledge_chat_pairs テーブル

プロンプトとレスポンスをペアにした学習用データセット。knowledge_chat_audit が監査ログとして個々のリクエスト単位で記録するのに対し、このテーブルはプロンプトとレスポンスを対にしたペアデータとして永続化する。

  CREATE TABLE knowledge_chat_pairs (
  id BIGSERIAL PRIMARY KEY,
  pair_id TEXT UNIQUE NOT NULL,
  correlation_id TEXT,
  pipeline_id TEXT,
  llm_model TEXT,
  prompt TEXT,
  response TEXT,
  prompt_tokens INT,
  completion_tokens INT,
  total_tokens INT,
  payload_json JSONB NOT NULL DEFAULT '{}'::jsonb,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
  

設計判断:

  • pair_id の UNIQUE 制約 — deterministic な ID 生成により、同一ペアの重複投入を ON CONFLICT DO NOTHING で防ぐ
  • トークン数カラム — fine-tuning 用データセットとしての利用を想定し、トークン統計をペイロードから抽出して正規化カラムに保持
  • audit テーブルとの分離 — 監査ログとペアデータセットは用途が異なるため、同一 NATS メッセージから2ジョブをファンアウトして並列に永続化する

knowledge_events テーブル(統一イベント)

embedding、retrieve、compose など複数の処理ステップが共有する統一イベントテーブル。

  CREATE TABLE knowledge_events (
    id             BIGSERIAL PRIMARY KEY,
    event_id       TEXT UNIQUE,          -- SHA256(event_type:correlation_id)[:32]
    event_type     TEXT NOT NULL,        -- 'embedding' | 'retrieve' | 'compose'
    correlation_id TEXT NOT NULL,
    pipeline_id    TEXT NOT NULL,
    llm_model      TEXT,
    payload_json   JSONB DEFAULT '{}'::jsonb,
    created_at     TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_knowledge_events_type ON knowledge_events (event_type);
CREATE INDEX idx_knowledge_events_corr ON knowledge_events (correlation_id);
CREATE INDEX idx_knowledge_events_date ON knowledge_events (created_at::date);
  

設計判断:

  • event_id の UNIQUE 制約 — 冪等性の最終防衛線。同一イベントが複数回処理されても重複挿入されない
  • event_type 判別カラム — テーブルを分散させず、1 テーブルに統合することで分析クエリを簡素化
  • インデックス — event_type と correlation_id で高速フィルタリング、created_at::date で日次集計

冪等性とエラーハンドリング

冪等設計

手段
NATSJetStream の durable consumer。ACK されるまでメッセージを保持
Dagstersensor が RunRequest 発行前にメッセージ内容を検証
PostgreSQLidempotency_key / event_id の UNIQUE 制約で重複排除

すべての副作用は UPSERT (ON CONFLICT) か UNIQUE 制約で重複を吸収する。

エラーハンドリング

  • NATS メッセージの ACK は Dagster ジョブ起動成功後に行う(Phase 1/2 では GraphQL 呼び出し成功後)
  • 失敗したメッセージは JetStream の再配送キューに保持される
  • Dagster 側のジョブ失敗は Dagit UI と slog ログの両方で追跡可能
  • correlation_id がすべてのログに含まれるため、gateway → NATS → Dagster → PostgreSQL の全経路を 1 クエリで追跡できる

順序保証

embedding → retrieve → compose の順序は NATS レベルでは保証しない。Dagster Asset 依存関係で順序を制御し、correlation_id で事後的に関連イベントを結合・検証する。


Dagster Software-Defined Asset の設計

Knowledge モジュールの各操作は、Dagster の Software-Defined Asset をレコード(抽出)→ データセット(永続化)のペアとして定義している。

Chat Pair アセット

chat_pair_recordchat_pair_dataset の2つのアセットで構成される。

  • chat_pair_record: NATS ペイロードからの純粋な抽出。deterministic な pair_id を生成し、トークン数やモデルメタデータを付与する。入力の変換のみで副作用なし
  • chat_pair_dataset: chat_pair_record に依存する永続化レイヤー。knowledge_chat_pairs テーブルに INSERT し、ON CONFLICT DO NOTHING で冪等性を保証

アセット依存関係がそのまま Dagit UI のリネージグラフになる。

Knowledge Event アセット

embedding、retrieve、compose の各操作も同じレコード → データセットパターンで定義している。

アセットペア対象操作NATS トピック
embedding_event_recordembedding_event_datasetベクトル生成(nomic-embed-text-v1.5)pipeline.knowledge.embedding
retrieve_event_recordretrieve_event_datasetベクトル検索 + reranking(ColBERT via FastAPI)pipeline.knowledge.retrieve
compose_event_recordcompose_event_dataset複数レスポンスからの選択pipeline.knowledge.compose

各レコードアセットは NATS ペイロードから event_idevent_typecorrelation_idpipeline_idllm_model を抽出する。各データセットアセットはレコードに依存し、knowledge_events テーブルに永続化する。イベント種別ごとにテーブルを分けずに event_type カラムで区別する単一テーブル設計を選択した理由は、スキーマの増殖を防ぎつつペイロードの構造差を JSONB カラムに吸収できるためである。

ファンアウトパターン

NATS の pipeline.knowledge.chat.persist トピックに到着した1つのメッセージは、2つの Dagster ジョブを並列にトリガーする:

  1. knowledge_chat_persistknowledge_chat_audit テーブル(監査ログ)
  2. knowledge_chat_pair_materializechat_pair_recordchat_pair_datasetknowledge_chat_pairs テーブル(ペアデータセット)

embedding、retrieve、compose はそれぞれ1トピック1ジョブの構成だが、トピック追加時にファンアウトが必要であればテーブル駆動マップにエントリを追加するだけで対応できる。


Go 側パイプラインブリッジ

Go 側(cmd/server/main.go)のブリッジもテーブル駆動パターンで統一した。

  type pipelineRoute struct {
    sourceTopic string
    targetJob   string
}
  

startPipelineToDagsterBridge() は複数の pipelineRoute を受け取り、startBridgeForRoute() が個別の goroutine として各 route の NATS subscribe → Dagster trigger を担当する。

新しい Knowledge 操作を追加する際のステップ:

  1. Go 側で NATS トピックに publish するコードを追加
  2. Python 側でアセットとジョブを定義
  3. Go 側の route テーブルと Python 側の TOPIC_JOB_MAP(または sensor 定義)にエントリを追加

newPipelineService() は NATS 接続失敗時に mock repository にフォールバックする設計を維持している。開発中にすべてのインフラが起動していなくても gateway 単体で動作できる。


Knowledge モジュールの計装状況

操作状態NATS トピック備考
Chat()計装済みpipeline.knowledge.chat.persist監査ログ + ペアデータセット
Embedding()計装済みpipeline.knowledge.embeddingnomic-embed-text-v1.5
Retrieve()計装済みpipeline.knowledge.retrievepgvector 実装は未完、ColBERT reranking あり
Compose()計装済みpipeline.knowledge.compose最小実装(最初の candidate を返す)
Produce()未実装placeholder
Plan()未実装placeholder
Execute()未実装placeholder

注意事項

  • Phase 1/2 の GraphQL trigger bridge コードは参考実装として残っているが、現行の sensor 方式とは併用しない
  • JetStream の max-age (72h) を超えた未消費メッセージは失われる。Dagster daemon が 72 時間以上停止した場合は手動リカバリが必要
  • knowledge_events.event_id の生成ロジック(SHA256(event_type:correlation_id)[:32])は、同一 correlation_id で同一 event_type のイベントが複数回発生するケースに注意が必要
  • PostgreSQL の init スクリプト(020-create-chat-pairs.sql030-create-knowledge-events.sql)は初回起動時のみ実行される。既存ボリュームでは手動で DDL を流す必要がある
  • Produce、Plan、Execute は未実装のまま。将来フェーズで計装対象に追加予定

検証

  • Dagster UI で asset materialization 状態を確認: chat_pair, flow_lineage, tool_call が正常に Materialized
  • NATS モニタリング (:8222) で stream 状態と consumer lag を監視
  • PostgreSQL で SELECT * FROM knowledge_events WHERE correlation_id = '...' による全経路トレーシングを確認
  • 冪等性テスト: 同一メッセージを複数回 publish し、PostgreSQL に重複レコードが生成されないことを検証

関連ファイル

この計装で変更・追加されたファイル:

ファイル内容
devstack/dagster/project/assets/chat_pairs.pychat_pair_record, chat_pair_dataset アセット
devstack/dagster/project/assets/knowledge_events.pyembedding/retrieve/compose イベントアセット
devstack/dagster/project/jobs/chat_pair_materialize.pychat pair materialization ジョブ
devstack/dagster/project/jobs/knowledge_events.pyembedding/retrieve/compose ジョブ
devstack/dagster/project/defs.pyアセット・ジョブ登録
devstack/dagster/worker/nats_consumer.pyテーブル駆動ルーティング(Phase 2)
devstack/postgres/docker-entrypoint-initdb.d/020-create-chat-pairs.sqlknowledge_chat_pairs DDL
devstack/postgres/docker-entrypoint-initdb.d/030-create-knowledge-events.sqlknowledge_events DDL
cmd/server/main.gopipelineRoute 構造体、ブリッジ汎用化

次のアクション

現在 Phase 1(リアルタイム基盤)が稼働中。今後の拡張:

  • Dagster Asset Materialization を活用したチャットペアの自動キュレーション
  • pg → Parquet → Iceberg バッチ変換(Phase 2、devstack 済み)
  • fine-tuning 用データセットの自動生成パイプライン

全体ロードマップはGo 基盤の設計全記録を参照。