結論

agent-gateway のパイプラインアーキテクチャを v3 としてリデザインした。核心は pipeline.knowledge.* という単一ドメインを llmobsidianmlflow の3ドメインに分割したこと。NATS subject、Dagster sensor/job/asset、Vector テレメトリルーティングの全てをこの3分類に揃えた。

開発段階でプロダクションデータが無い状態で実施したため、後方互換性は一切考えず、旧トピック・旧 durable consumer は単純削除とした。6ワークストリーム + MinIO 外部化で計8コミット。Go 側は各ワークストリームで go build ./... を通過、Dagster 側はインポート検証済み。

リデザインの動機は明確で、Obsidian vault 連携や MLflow 実験管理を追加しようとした時点で「knowledge」というラベルではドメイン境界が曖昧になり、Dagster の sensor ルーティングもテレメトリの分類も、何が何のためのイベントなのか名前から判別できなくなっていた。

この記事はGo + NATS + Dagster によるAIオーケストレーション基盤の続編にあたる。


前提

  • 対象システム: agent-gateway — Go + Python 構成の自作 LLM オーケストレーション基盤
  • ランタイム: EPYC 9175F (768GB DDR5, RTX 6000 Pro Max-Q)、Podman rootless
  • メッセージング: NATS 2.11 (JetStream)
  • オーケストレーション: Dagster (Python)
  • テレメトリ: Vector (Rust) → Prometheus / Loki
  • 新規統合対象: Obsidian vault、MLflow Model Registry
  • 設計ドキュメント: docs/2nd-design-archi.md

設計判断

ドメイン分割の方針

旧設計では全パイプラインが pipeline.knowledge.* に集約されていた。chat persist も embedding も tool call も同一の「knowledge」ラベルの下にあり、以下の問題が生じていた。

  • Obsidian vault 操作を追加する際、pipeline.knowledge.embedding が「チャット用の embedding なのか vault indexing 用なのか」判別不能
  • MLflow 実験管理のイベントを knowledge ドメインに入れるのは意味的に破綻
  • Dagster sensor のルーティングロジックが subject 名から意図を読み取れない
  • テレメトリの分類が機能単位ではなくなっていた

分割後の3ドメイン:

ドメイン責務NATS subject prefix
llmチャット永続化、tool call、flow lineagepipeline.llm.*
obsidianvault 操作、semantic search、indexingpipeline.obsidian.*
mlflow実験 run lifecycle、model registry webhookpipeline.mlflow.*

テレメトリも同様に telemetry.llm.* / telemetry.obsidian.* / telemetry.mlflow.* に分離。

後方互換性の排除

開発段階でプロダクションデータが無いため、マイグレーションロジックは書かない方針とした。旧トピック・旧 durable consumer は単純削除。この判断により実装コストを大幅に削減できた。

ワークストリーム構成

6ワークストリームを依存順に並べ、各 WS を1コミットとするチェックポイント方式で進めた。

  WS1 (Lakehouse簡素化) → WS2 (NATSトピック移行) → WS3 + WS4 + WS5 (並行) → WS6 (Vector config)
  

WS3 (Go domain 層) / WS4 (Go transport 層) / WS5 (Python Dagster) はレイヤーが異なるため並行可能。


実装

WS1: Lakehouse 簡素化

Nessie + Trino + dbt-fusion のスタックは以前の ELT 最適化検証で導入したものだが、現時点では PyIceberg + JDBC catalog で十分と判断し削除した。

変更内容:

  • podman-compose.yml から nessie、trino、dbt-fusion サービスと trino-warehouse ボリュームを削除
  • internal/config/config.go から TrinoBaseURLNessieBaseURL フィールドと env var ロードを削除
  • devstack/postgres/init.sql に Iceberg JDBC catalog メタデータテーブル (iceberg_tablesiceberg_namespace_properties) を追加
  • devstack/trino/ ディレクトリを丸ごと削除

WS2: NATS トピック移行

pipeline.knowledge.* の全トピック定数を新ドメイン別名前空間に一括置換した。5ファイルにまたがる変更。

旧トピック新トピック
pipeline.knowledge.chat.persistpipeline.llm.chat.persist
pipeline.knowledge.embeddingpipeline.obsidian.semantic_search
pipeline.knowledge.retrievepipeline.llm.retrieve
pipeline.knowledge.composepipeline.llm.compose
pipeline.knowledge.flow.lineagepipeline.llm.flow.lineage
pipeline.knowledge.tool_callpipeline.llm.tool_call

テレメトリトピック (telemetry.knowledge.*telemetry.llm.* / telemetry.obsidian.*) と DAG 名 (knowledge.prompt_flowllm.prompt_flow) も同時に更新。

grep "pipeline\.knowledge\|telemetry\.knowledge"internal/ 配下に旧トピックの残留が無いことを確認。

WS3: domain/obsidian 新規作成

internal/domain/obsidian/ に3ファイルを新規作成した。

repository.goVaultRepository インターフェースとローカルファイルシステム実装 LocalVault。path traversal 防止の resolve() メソッドで vault root 外へのアクセスを拒否する。

service.goService struct に5メソッド:

メソッド責務
SemanticSearchvault 内の意味検索
ReadNoteノート読み取り
WriteNoteノート書き込み
ReviseNoteノート改訂
Proofread校正

LLM は直接呼ばず knowledge orchestrator 経由とし、obsidian ドメインは vault I/O と embedding に専念する設計。各操作で pipeline.obsidian.* にイベントを publish、telemetry.obsidian.* にテレメトリを送る。

indexer.go — vault 走査 → チャンク分割 → embedding → pgvector upsert のパイプライン。

  vault files → scan → chunk (512 rune, 64 overlap) → embed → pgvector upsert
  

internal/config/config.goObsidianVaultDir フィールドと OBSIDIAN_VAULT_DIR env var を追加。cmd/server/main.goobsidian.NewService を初期化するワイヤリングを追加(transport 層への接続は未実装)。

WS4: MLflow webhook エンドポイント

Go の transport 層に MLflow 統合用の2ハンドラを新規作成した。

webhook_mlflow.goPOST /v1/webhook/mlflow

MLflow Model Registry の webhook リクエストを受け取り、X-Webhook-Signature ヘッダで HMAC-SHA256 検証を行い、イベントタイプに応じて NATS に publish する。

イベントタイプNATS subject
model registeredpipeline.mlflow.model.registered
version createdpipeline.mlflow.model.version_created
alias setpipeline.mlflow.model.alias_set
tag setpipeline.mlflow.model.tag_set

pipeline_mlflow.go — 2エンドポイント:

エンドポイントNATS subject
POST /v1/pipeline/mlflow/runpipeline.mlflow.run.{started,completed,failed}
POST /v1/pipeline/mlflow/metricspipeline.mlflow.run.metrics

server.goWithPipelineService ServerOption を追加し、既存の WithConversationRepository パターンに倣って DI を実装。config.goMLflowWebhookSecret (MLFLOW_WEBHOOK_SECRET env var) を追加。

WS5: Dagster sensor/job/asset 再構成

最大の変更量: 16ファイル、+401行 / -229行。

既存の6個の per-topic sensor を4個の wildcard sensor に書き換えた。

旧 sensor新 sensorsubject
nats_dagster_chat_persistnats_dagster_llmpipeline.llm.>
nats_dagster_embeddingnats_dagster_obsidianpipeline.obsidian.>
nats_dagster_retrievenats_dagster_mlflow_runpipeline.mlflow.run.>
nats_dagster_composenats_dagster_mlflow_modelpipeline.mlflow.model.>
nats_dagster_flow_lineage(統合)
nats_dagster_tool_call(統合)

各 sensor は wildcard subject で subscribe し、受信メッセージの subject suffix でジョブを分岐するルーティングパターンに変更。

旧 job ファイル (chat_persist.pychat_pair_materialize.pyknowledge_events.pytool_calls.py) を削除し、ドメイン別の llm_jobs.pyobsidian_jobs.pymlflow_jobs.py を新規作成。

asset の key_prefix もドメイン別に更新:

  • ["knowledge", "embedding"]["obsidian", "embedding"]
  • knowledge_chat_auditllm_chat_audit
  • knowledge_eventspipeline_events

WS6: Vector config 更新

devstack/vector/vector.yaml にドメイン別ルーティング transforms を追加。

  source: telemetry.> (wildcard)
    |
    v
route_by_domain (subject prefix で分岐)
    |-- telemetry.llm.*     → domain: "llm"
    |-- telemetry.obsidian.* → domain: "obsidian"
    |-- telemetry.mlflow.*  → domain: "mlflow"
    |
    v
add_domain_label (.domain フィールド付与)
    |
    +-- sink: console
    +-- sink: prometheus_exporter (ドメインラベル付き)
  

追加: MinIO 外部化

devstack 内の MinIO コンテナを既存の外部 MinIO (storage.home.arpa:9000) に置き換えた。自宅ネットワークで既に稼働中の MinIO には archives、artifacts、datasets、lightdash、models のバケットがある。

  • podman-compose.yml から minio サービスと minio-data ボリュームを削除
  • dagster-user-code、dagster-daemon の S3 エンドポイントを storage.home.arpa に更新
  • minio-init コンテナで agw-mlflowagw-iceberg バケットを agw- prefix で自動作成(既存バケットとの namespace 分離)
  • MLflow の artifact root を s3://mlflow/ から s3://agw-mlflow/ に更新

注意事項

  • obsidian domain の indexer は vault diff 検知が未実装。現状は全ファイル走査の IndexAll のみ
  • NATS JetStream の旧 durable consumer 名が残骸として残る。nats-init での stream purge / 再作成が必要
  • Dagster UI でのアセットグラフ動作確認が未実施
  • obsidian domain の transport 層への接続は TODO のまま
  • WS5 実装中にツールのコンテキストクリアが発生し defs.pyassets/__init__.py の再読み込みが必要になったが、実装自体への影響はなし
  • indexer.go の初回生成時に encodeJSON ヘルパーが壊れたコードになっていた(var jsonMarshalfmt.Sprintf にフォールバックする不自然なパターン)。json.Marshal 直接呼び出しに修正済み

検証

  • 各ワークストリームで go build ./... を通過
  • Dagster 側はインポート検証済み
  • grep "pipeline\.knowledge\|telemetry\.knowledge"internal/ 配下に旧トピック残留なし
  • 全コミットは feature/redesin-devstack ブランチ上

次のアクション

項目優先度
obsidian indexer の vault diff 検知実装
nats-init での旧 stream purge / 再作成
Dagster UI でのアセットグラフ動作確認
obsidian domain の transport 層接続
MLflow webhook のエンドツーエンドテスト

コミット履歴

コミット内容
8d3f399Nessie/Trino 削除、Iceberg JDBC catalog テーブル追加
86bb7f0NATS トピック pipeline.knowledge.*pipeline.llm.* / pipeline.obsidian.* 移行
9f6371cdomain/obsidian 新規作成 (Service, VaultRepository, Indexer)
95a6ae8MLflow webhook + pipeline エンドポイント追加
57ec661Dagster sensor/job/asset をドメイン別に再構成 (4 wildcard sensors)
9186490Vector テレメトリルーティング更新
devstack MinIO → storage.home.arpa 外部化
minio-init で agw-mlflow / agw-iceberg バケット自動作成