背景

LLMを業務パイプラインに組み込むには、単純なRequest-Responseでは足りないことが多い。複数LLMの並列推論、SSEによる即応フィードバック、冪等なリトライ、トレーサビリティが必要。

Rust(axum)で薄型OpenAI互換プロキシを作り、NATS Coreでイベント中継し、Dagster oneshotジョブで重い処理を実行する「AI知能工場」を設計した。最終的にGoへ全面移行した経緯も含めて記録する。

コンポーネント責務

レイヤー技術責務
エントリポイントRust/Go + axumOpenAI互換EP、trace_id採番、SSE保持
メッセージングNATS CorePub/Subイベント中継(evt.chat.{trace_id})
実行エンジンDagster + systemdoneshot型ジョブ実行(常駐しない)
オブザーバビリティLoki + Grafana + Dagittrace_id付きログ、Lineage追跡
キャッシュ/保存PostgreSQL + Qdrant冪等ログ + セマンティックキャッシュ

通信フロー

  1. クライアント → Rust API: /v1/chat/completions (stream=true)。trace_id発行しSSE開始
  2. Rust → Dagster: systemd/Quadletでoneshot起動(dagster-<job>@{trace_id}
  3. Dagster (op/pipeline): ローカルLLM並列実行。トークンや進捗をevt.chat.{trace_id}でNATSにpublish。成果物コミット後にfinishedを一度だけ送信
  4. Rust API: subscribe("evt.chat.{trace_id}")でNATS購読、OpenAI互換chunkに整形してSSEに流す
  5. 保存: ストリーム完了時にPG/QdrantへUPSERT

イベントスキーマ

  {"type":"role","role":"assistant"}
{"type":"token","text":"...","task":"A"}
{"type":"tool_call","name":"search","arguments":"{...}"}
{"type":"usage","usage":{...}}
{"type":"finished","reason":"stop","winner":"llama3.1-8b"}
  

Rust APIがこれをOpenAI互換のdata: {...}\n\n形式に変換してSSEで送信。

冪等・リトライ設計

  • req_id = sha256(model + messages + params)。同一リクエストの重複実行を吸収
  • PGスキーマ: idempotency_log(key PK, status, result, updated_at) + completions_cache(req_id PK, model, content, usage, created_at)
  • 全副作用はUPSERT/unique制約で重複吸収
  • ネットワーク系は指数バックオフ + 最大試行回数
  • finishedは成果物コミット後に一度だけ

OpenAI互換プロキシ拡張

標準OpenAI APIにx_フィールドで独自機能を注入:

  • x_route: direct(素通し)/ rag(知識検索)/ workflow(非同期ジョブ)
  • x_adapter: LoRAアダプタ指定
  • x_project: プロジェクト識別子
  • 透過的RAG: Qdrant/PGから文脈を自動挿入(クライアントは意識しない)

NATS Core → JetStream移行パス

初期はNATS Coreで最薄構成。喪失容認 + 冪等で堅牢化。

JetStream移行時の差分:

  • REQストリーム追加: Rust→js.publish("req.chat", {...})、Dagsterはpull consumerでfetch→ack。確実受付・再試行
  • EVTはpush consumer: deliver_subjectを割当て、push配信をSSEへ即中継

Quadlet/systemd統合

  # [email protected] (Type=oneshot)
ContainerEnv=TRACE_ID=%i
AutoRemove=yes
Restart=on-failure
After=nats.service
  

Rust側からsystemd-run --user -u dagster-debate@{trace_id}で起動。loginctl enable-linger ksh3でユーザーsystemdを常駐。

ディレクトリ構成

  /dagster
  /systemd      # Quadletテンプレート
  /entrypoints  # run_xxx.py(TRACE_IDを受け取って実行)
  /repo          # ops/jobs/assets/pipelines + definitions.py
  /instance      # DAGSTER_HOME共有
  /fn            # 共通関数(NATS publish, PG/Qdrant I/O等)
  

RustからGoへの移行

移行の理由

Rustで設計した構成は精密だが、大量の非同期処理(SSE中継、NATS Pub/Sub、並列検索)の記述コストが高く、柔軟性とのバランスが悪かった。

  • SSEの中継 + NATSの購読 + PG/Qdrantへの書き込みが同時に走る非同期コンテキストの管理がRustでは冗長
  • goroutine + channelの方がこのパターンに素直にフィットする
  • 実行時オーバーヘッドの差はこのユースケースでは無視できるレベル

Goで保持した設計原則

  • OpenAI互換エンドポイント
  • NATS Pub/Subによるイベント駆動
  • Dagster oneshotジョブの起動
  • trace_id付きの全ログ
  • 冪等設計(PG UPSERT)

Rust版の設計ドキュメントはそのまま「仕様書」として価値がある。Go実装はRust設計の「簡潔な翻訳」。

運用ノブ

  • 並列度: Dagster Dynamic Mapping + run tags、またはsystemd同時Start制限
  • タイムアウト: EP全体/ジョブ個別のTimeout設定
  • キャッシュ閾値: Qdrant類似度 >= 0.92で「暫定応答」制御
  • キャンセル: クライアント切断でcancel.{trace_id}をNATSにpublish → op側で早期終了

観測

  • ログ: JSONでtrace_id必須。Promtail→Loki→Grafanaで{trace_id="..."}|json検索
  • メトリクス: SSEレイテンシ、ドロップ率、oneshot成功率をダッシュボード化
  • Dagit: 共通DAGSTER_HOME/Run Storage(Postgres推奨)で過去Run/Lineage閲覧