概要

AI推論システムにおいて、ユーザーのプロンプトとLLMの生成レスポンスを正確に記録し、後続の分析や再現に活用することは重要になってくる。本稿では、agent-gatewayプロジェクトで実装されたDagster + NATS統合システムについて、設計思想、実装上の課題、そして本番環境への展開を記述する。

背景:なぜイベント駆動パイプラインが必要か

従来のアプローチの限界

初期段階では、知識サービス(Knowledge Service)がチャットレスポンス直後にPostgreSQLへ同期的にデータを書き込んでいた。しかし、以下の課題が浮上した:

  1. レスポンス遅延:後処理がレスポンス時間に含まれる
  2. スケーラビリティの欠如:複数の後処理ステップが逐次実行される
  3. 可観測性の低さ:どの処理がいつ失敗したのか把握が困難
  4. 再現不可能な処理フロー:RAGロジックの改善や検証が難しい

解決策の方向性

これらの課題を解決するため、メッセージキュー(NATS)とワークフロー管理ツール(Dagster)の組み合わせを採用することにした。

アーキテクチャ設計

システム全体図

  Knowledge Service
    ↓ (NATS pub)
NATS Server (JetStream)
    ↓ (NATS sub)
NATS Consumer Worker
    ↓ (GraphQL)
Dagster GraphQL API
    ↓
Dagster Daemon/WebServer
    ↓ (execute ops)
PostgreSQL (audit logs)
  

コンポーネント間の通信フロー

  1. Knowledge ServiceがUIプロンプトとLLMレスポンスをペアでNATSに発行
  2. NATS Consumer Workerがメッセージを購読
  3. Dagster GraphQL APIを呼び出してジョブを実行
  4. Dagster OpsがデータをPostgreSQLに永続化

NATS メッセージフォーマット設計

チャット永続化メッセージの標準スキーマ:

  {
  "pipeline_id": "knowledge.chat.persist",
  "correlation_id": "unique-trace-id",
  "idempotency_key": "idempotency-unique-id",
  "model": "llm-model-name",
  "llm_model": "alternative-field-for-compatibility",
  "prompt": "user-provided-prompt-text",
  "response": "llm-generated-response-text",
  "messages": [{"role": "user", "content": "..."}],
  "usage": {"prompt_tokens": 100, "completion_tokens": 50},
  "requested_at": 1697845200000,
  "payload_json": "additional structured data"
}
  

設計上の考慮事項

  • correlation_id:エンドツーエンドのトレーシング用に、複数のパイプライン段階を通して伝播
  • idempotency_key:同じメッセージが複数回処理されてもべき等性を保証
  • payload_json:拡張性。追加メタデータやカスタムフィールドをJSONBで保存

Dagster パイプラインの実装

ジョブとオペレーション

  @job(name="knowledge_chat_persist")
def knowledge_chat_persist():
    persist_chat_to_pg(load_payload_from_config())
  

load_payload_from_config Op

  • NATSメッセージのJSON文字列を辞書に変換
  • デコード失敗時はフォールバック処理

persist_chat_to_pg Op

  CREATE TABLE knowledge_chat_audit (
  id BIGSERIAL PRIMARY KEY,
  pipeline_id TEXT NOT NULL,
  correlation_id TEXT,
  idempotency_key TEXT,
  model TEXT,
  prompt TEXT,
  response TEXT,
  payload_json JSONB NOT NULL DEFAULT '{}',
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
  

このテーブルに以下の処理で書き込み:

  1. テーブルが存在しない場合は作成(冪等)
  2. 設定値からcorrelation_id、idempotency_key、プロンプト、レスポンスを抽出
  3. PostgreSQLに監査レコードを挿入

リソース設計

PostgresResource:

  class PostgresResource(ConfigurableResource):
    dsn: str = "postgresql://postgres:postgres@postgres:5432/agent_gateway"
    
    def get_connection(self):
        actual_dsn = os.getenv("POSTGRES_DSN", self.dsn)
        return psycopg2.connect(actual_dsn)
  

環境変数による上書き可能な設計で、開発環境と本番環境での接続文字列を切り分け。

NATS Consumer Worker の実装

初期実装から拡張への進化

Phase 1: 単純な単一ジョブ路由

  async def message_handler(msg):
    payload = json.loads(msg.data.decode())
    submit_dagster_run(
        job_name="knowledge_chat_persist",
        payload=payload,
        correlation_id=payload.get("correlation_id", "")
    )
  

この単純な実装では、1つのトピックに1つのジョブが対応していた。

Phase 2: マルチジョブ・ファンアウト

処理要件の拡張により、同じメッセージから複数のジョブを起動する必要が生じた:

  if subject == "pipeline.knowledge.chat.persist":
    # 監査ログ
    submit_dagster_run("knowledge_chat_persist", ...)
    # ペアデータセット資産化
    submit_dagster_run("knowledge_chat_pair_materialize", ...)
  

Phase 3: テーブル駆動設計(最終版)

トピック数が増加するにつれ、スケーラビリティの問題が明白に。テーブル駆動アプローチに移行:

  TOPIC_JOB_MAP = {
    "pipeline.knowledge.chat.persist": [
        ("knowledge_chat_persist", build_audit_run_config),
        ("knowledge_chat_pair_materialize", build_asset_run_config),
    ],
    "pipeline.knowledge.embedding": [
        ("knowledge_embedding", build_event_run_config),
    ],
    "pipeline.knowledge.retrieve": [
        ("knowledge_retrieve", build_event_run_config),
    ],
    "pipeline.knowledge.compose": [
        ("knowledge_compose", build_event_run_config),
    ],
}
  

この設計により:

  • トピック数が増えても、コードの複雑度はO(1)に保持
  • 新しいトピック・ジョブマッピングは、テーブルへのエントリ追加のみ
  • 各ジョブの設定ビルダ関数を独立して管理可能

GraphQL APIを使用した非同期ジョブ起動

Dagster GraphQL Mutation:

  mutation LaunchRun($executionParams: LaunchRunExecutionParam!) {
  launchRun(executionParams: $executionParams) {
    __typename
    ... on LaunchRunSuccess {
      run { runId }
    }
    ... on RunConfigValidationInvalid {
      errors { message reason }
    }
  }
}
  

Python実装:

  def submit_dagster_run(job_name: str, run_config: dict, correlation_id: str):
    variables = {
        "executionParams": {
            "selector": {
                "repositoryLocationName": "dev_repo",
                "repositoryName": "__repository__",
                "jobName": job_name
            },
            "runConfigData": run_config,
            "mode": "default"
        }
    }
    response = requests.post(DAGSTER_GRAPHQL_URL, json={...})
    run_id = response.json()["data"]["launchRun"]["run"]["runId"]
    logger.info(f"Launched run {run_id}")
  

非同期メッセージ処理の要点

  async def message_handler(msg):
    # デコード、バリデーション
    payload = json.loads(msg.data.decode())
    correlation_id = payload.get("correlation_id", "")
    
    # トピック→ジョブマッピング
    routes = TOPIC_JOB_MAP.get(msg.subject)
    if not routes:
        logger.warning(f"No mapping for {msg.subject}")
        return
    
    # 複数ジョブへのファンアウト
    for job_name, build_config in routes:
        submit_dagster_run(
            job_name=job_name,
            run_config=build_config(payload, correlation_id),
            correlation_id=correlation_id
        )

async def main():
    nc = NATS()
    await nc.connect(NATS_URL)
    
    # 全トピックをサブスクライブ
    for topic in TOPIC_JOB_MAP:
        await nc.subscribe(topic, cb=message_handler)
  

PostgreSQL スキーマ設計

knowledge_chat_audit テーブル

チャット永続化の監査ログ。相関ID、べき等性キー、モデル情報をトラッキング。

knowledge_events テーブル(拡張版)

複数の知識処理ステップ(embedding、retrieve、compose)が共有する統一イベントテーブル:

  CREATE TABLE knowledge_events (
    id             BIGSERIAL PRIMARY KEY,
    event_id       TEXT UNIQUE,          -- SHA256(event_type:correlation_id)[:32]
    event_type     TEXT NOT NULL,        -- 'embedding' | 'retrieve' | 'compose'
    correlation_id TEXT NOT NULL,
    pipeline_id    TEXT NOT NULL,
    llm_model      TEXT,
    payload_json   JSONB DEFAULT '{}'::jsonb,
    created_at     TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_knowledge_events_type ON knowledge_events (event_type);
CREATE INDEX idx_knowledge_events_corr ON knowledge_events (correlation_id);
CREATE INDEX idx_knowledge_events_date ON knowledge_events (created_at::date);
  
  • event_id:UNIQUEコンストレイントにより重複処理を防止
  • event_type:判別カラムにより、複数のジョブタイプが同一テーブルに書き込み可能
  • correlation_id:複数テーブル間でのトレーシング
  • インデックス:event_typeとcorrelation_idで高速検索、created_atで日付単位の集計を加速

Docker Compose での統合

  services:
  nats:
    image: nats:2.11-alpine
    command: ["-js"]
    ports: ["4222:4222", "8222:8222"]
  
  postgres:
    image: agent-gateway/postgres:18-jit-vector
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: agent_gateway
  
  dagster-user-code:
    environment:
      POSTGRES_DSN: postgresql://postgres:postgres@postgres:5432/agent_gateway
    command: [dagster, api, grpc, -h, 0.0.0.0, -p, 4000]
  
  dagster-webserver:
    environment:
      POSTGRES_DSN: postgresql://postgres:postgres@postgres:5432/agent_gateway
    command: [dagster-webserver, -h, 0.0.0.0, -p, 3000]
  
  dagster-daemon:
    command: [dagster-daemon, run]
  

NATS Consumer WorkerはKubernetesまたはSystemdサービスとして独立して実行。

実装上の課題と解決策

課題1: 順序保証の欠如

問題:複数の知識処理ステップ(embedding → retrieve → compose)が非同期で実行される場合、実行順序が保証されない。

解決策

  • Dagster Asset依存関係を活用して、asset化したチャットペアデータに依存する後処理を定義
  • correlation_idで関連イベントを結合し、事後的に順序を検証

課題2: べき等性の保証

問題:ネットワーク障害によるメッセージの再送で、同じデータが複数回挿入される可能性。

解決策

  • idempotency_keyをNATSメッセージに含める
  • PostgreSQL UPSERT(ON CONFLICT)またはevent_idのUNIQUEコンストレイントで重複を排除

課題3: エラーハンドリングと再試行

問題:Dagster GraphQL呼び出しが失敗した場合、メッセージが喪失される可能性。

解決策

  • NATS メッセージを明示的にACKするタイミングを遅延(Dagster起動成功後)
  • 失敗したメッセージはNATS JetStreamの再試行キューに保持
  • Dagster失敗時のログ出力で、手動介入の契機を提供

本番運用の工夫

監視とアラート

  logger.info(f"Submitted job {job_name} to Dagster (correlation_id={correlation_id})")
logger.error(f"Failed to launch run: {launch_result}")
  

correlation_idをログに含めることで、ユーザー操作からパイプライン実行まで全て追跡可能。

トレーシング

correlation_idは以下の箇所で伝播:

  1. Knowledge Service → NATS メッセージ
  2. NATS → Dagster run_config
  3. Dagster → PostgreSQL (correlation_id カラム)
  4. PostgreSQL → 分析クエリ(「このユーザーの操作は全体どのように処理されたか」を検索)

得られた知見

1. 非同期パイプラインの設計

イベント駆動アーキテクチャでは、メッセージスキーマの設計が極めて重要。correlation_idのような追跡情報を早期に盛り込むことで、後の運用負荷を大幅に削減できる。

2. スケーラビリティと保守性

初期的な条件分岐ロジックから、テーブル駆動設計への進化は、コード複雑度の増加を抑制しつつ、機能を拡張するための実践的なパターンを示唆する。

3. データベース設計での統一性

複数のイベント種別が異なるテーブルに分散していると、統合分析が困難。共有イベントテーブルに統合し、event_type判別カラムで多態性を実現する設計が有効。

4. GraphQL APIの活用

REST APIと異なり、GraphQL APIは必要なフィールドのみ取得できるため、ペイロード削減とレスポンス高速化に貢献。また、mutation の戻り値で実行結果を即座に検証でき、エラーハンドリングが容易。

まとめ

Dagster + NATS統合により、以下を実現した:

  1. 非同期処理:ユーザー向けレスポンスと後処理を分離し、レスポンス遅延を大幅に削減
  2. スケーラビリティ:テーブル駆動設計で、ジョブ数の増加に対応
  3. 可観測性:correlation_idとloggingにより、全処理フロー追跡可能
  4. 再現性:Dagster lineageにより、RAG処理が再現可能なassetとして記録

次のステップとしては、Dagster Asset Materializationを活用した、チャットペアの自動キュレーション、さらには微調整データセットの自動生成が考えられる。