agent-gateway v3 リデザイン — knowledge ドメイン分割と MLflow/Obsidian 統合
agent-gateway の pipeline.knowledge.* 単一ドメインを llm / obsidian / mlflow の3ドメインに分割するリデザイン記録。NATS subject、Dagster sensor/job/asset、Vector テレメトリを一括移行し、Lakehouse 簡素化と MinIO 外部化を実施。
結論
agent-gateway のパイプラインアーキテクチャを v3 としてリデザインした。核心は pipeline.knowledge.* という単一ドメインを llm、obsidian、mlflow の3ドメインに分割したこと。NATS subject、Dagster sensor/job/asset、Vector テレメトリルーティングの全てをこの3分類に揃えた。
開発段階でプロダクションデータが無い状態で実施したため、後方互換性は一切考えず、旧トピック・旧 durable consumer は単純削除とした。6ワークストリーム + MinIO 外部化で計8コミット。Go 側は各ワークストリームで go build ./... を通過、Dagster 側はインポート検証済み。
リデザインの動機は明確で、Obsidian vault 連携や MLflow 実験管理を追加しようとした時点で「knowledge」というラベルではドメイン境界が曖昧になり、Dagster の sensor ルーティングもテレメトリの分類も、何が何のためのイベントなのか名前から判別できなくなっていた。
この記事はGo + NATS + Dagster によるAIオーケストレーション基盤の続編にあたる。
前提
- 対象システム: agent-gateway — Go + Python 構成の自作 LLM オーケストレーション基盤
- ランタイム: EPYC 9175F (768GB DDR5, RTX 6000 Pro Max-Q)、Podman rootless
- メッセージング: NATS 2.11 (JetStream)
- オーケストレーション: Dagster (Python)
- テレメトリ: Vector (Rust) → Prometheus / Loki
- 新規統合対象: Obsidian vault、MLflow Model Registry
- 設計ドキュメント:
docs/2nd-design-archi.md
設計判断
ドメイン分割の方針
旧設計では全パイプラインが pipeline.knowledge.* に集約されていた。chat persist も embedding も tool call も同一の「knowledge」ラベルの下にあり、以下の問題が生じていた。
- Obsidian vault 操作を追加する際、
pipeline.knowledge.embeddingが「チャット用の embedding なのか vault indexing 用なのか」判別不能 - MLflow 実験管理のイベントを knowledge ドメインに入れるのは意味的に破綻
- Dagster sensor のルーティングロジックが subject 名から意図を読み取れない
- テレメトリの分類が機能単位ではなくなっていた
分割後の3ドメイン:
| ドメイン | 責務 | NATS subject prefix |
|---|---|---|
llm | チャット永続化、tool call、flow lineage | pipeline.llm.* |
obsidian | vault 操作、semantic search、indexing | pipeline.obsidian.* |
mlflow | 実験 run lifecycle、model registry webhook | pipeline.mlflow.* |
テレメトリも同様に telemetry.llm.* / telemetry.obsidian.* / telemetry.mlflow.* に分離。
後方互換性の排除
開発段階でプロダクションデータが無いため、マイグレーションロジックは書かない方針とした。旧トピック・旧 durable consumer は単純削除。この判断により実装コストを大幅に削減できた。
ワークストリーム構成
6ワークストリームを依存順に並べ、各 WS を1コミットとするチェックポイント方式で進めた。
WS1 (Lakehouse簡素化) → WS2 (NATSトピック移行) → WS3 + WS4 + WS5 (並行) → WS6 (Vector config)
WS3 (Go domain 層) / WS4 (Go transport 層) / WS5 (Python Dagster) はレイヤーが異なるため並行可能。
実装
WS1: Lakehouse 簡素化
Nessie + Trino + dbt-fusion のスタックは以前の ELT 最適化検証で導入したものだが、現時点では PyIceberg + JDBC catalog で十分と判断し削除した。
変更内容:
podman-compose.ymlから nessie、trino、dbt-fusion サービスと trino-warehouse ボリュームを削除internal/config/config.goからTrinoBaseURL、NessieBaseURLフィールドと env var ロードを削除devstack/postgres/init.sqlに Iceberg JDBC catalog メタデータテーブル (iceberg_tables、iceberg_namespace_properties) を追加devstack/trino/ディレクトリを丸ごと削除
WS2: NATS トピック移行
pipeline.knowledge.* の全トピック定数を新ドメイン別名前空間に一括置換した。5ファイルにまたがる変更。
| 旧トピック | 新トピック |
|---|---|
pipeline.knowledge.chat.persist | pipeline.llm.chat.persist |
pipeline.knowledge.embedding | pipeline.obsidian.semantic_search |
pipeline.knowledge.retrieve | pipeline.llm.retrieve |
pipeline.knowledge.compose | pipeline.llm.compose |
pipeline.knowledge.flow.lineage | pipeline.llm.flow.lineage |
pipeline.knowledge.tool_call | pipeline.llm.tool_call |
テレメトリトピック (telemetry.knowledge.* → telemetry.llm.* / telemetry.obsidian.*) と DAG 名 (knowledge.prompt_flow → llm.prompt_flow) も同時に更新。
grep "pipeline\.knowledge\|telemetry\.knowledge" で internal/ 配下に旧トピックの残留が無いことを確認。
WS3: domain/obsidian 新規作成
internal/domain/obsidian/ に3ファイルを新規作成した。
repository.go — VaultRepository インターフェースとローカルファイルシステム実装 LocalVault。path traversal 防止の resolve() メソッドで vault root 外へのアクセスを拒否する。
service.go — Service struct に5メソッド:
| メソッド | 責務 |
|---|---|
SemanticSearch | vault 内の意味検索 |
ReadNote | ノート読み取り |
WriteNote | ノート書き込み |
ReviseNote | ノート改訂 |
Proofread | 校正 |
LLM は直接呼ばず knowledge orchestrator 経由とし、obsidian ドメインは vault I/O と embedding に専念する設計。各操作で pipeline.obsidian.* にイベントを publish、telemetry.obsidian.* にテレメトリを送る。
indexer.go — vault 走査 → チャンク分割 → embedding → pgvector upsert のパイプライン。
vault files → scan → chunk (512 rune, 64 overlap) → embed → pgvector upsert
internal/config/config.go に ObsidianVaultDir フィールドと OBSIDIAN_VAULT_DIR env var を追加。cmd/server/main.go で obsidian.NewService を初期化するワイヤリングを追加(transport 層への接続は未実装)。
WS4: MLflow webhook エンドポイント
Go の transport 層に MLflow 統合用の2ハンドラを新規作成した。
webhook_mlflow.go — POST /v1/webhook/mlflow
MLflow Model Registry の webhook リクエストを受け取り、X-Webhook-Signature ヘッダで HMAC-SHA256 検証を行い、イベントタイプに応じて NATS に publish する。
| イベントタイプ | NATS subject |
|---|---|
| model registered | pipeline.mlflow.model.registered |
| version created | pipeline.mlflow.model.version_created |
| alias set | pipeline.mlflow.model.alias_set |
| tag set | pipeline.mlflow.model.tag_set |
pipeline_mlflow.go — 2エンドポイント:
| エンドポイント | NATS subject |
|---|---|
POST /v1/pipeline/mlflow/run | pipeline.mlflow.run.{started,completed,failed} |
POST /v1/pipeline/mlflow/metrics | pipeline.mlflow.run.metrics |
server.go に WithPipelineService ServerOption を追加し、既存の WithConversationRepository パターンに倣って DI を実装。config.go に MLflowWebhookSecret (MLFLOW_WEBHOOK_SECRET env var) を追加。
WS5: Dagster sensor/job/asset 再構成
最大の変更量: 16ファイル、+401行 / -229行。
既存の6個の per-topic sensor を4個の wildcard sensor に書き換えた。
| 旧 sensor | 新 sensor | subject |
|---|---|---|
nats_dagster_chat_persist | nats_dagster_llm | pipeline.llm.> |
nats_dagster_embedding | nats_dagster_obsidian | pipeline.obsidian.> |
nats_dagster_retrieve | nats_dagster_mlflow_run | pipeline.mlflow.run.> |
nats_dagster_compose | nats_dagster_mlflow_model | pipeline.mlflow.model.> |
nats_dagster_flow_lineage | (統合) | |
nats_dagster_tool_call | (統合) |
各 sensor は wildcard subject で subscribe し、受信メッセージの subject suffix でジョブを分岐するルーティングパターンに変更。
旧 job ファイル (chat_persist.py、chat_pair_materialize.py、knowledge_events.py、tool_calls.py) を削除し、ドメイン別の llm_jobs.py、obsidian_jobs.py、mlflow_jobs.py を新規作成。
asset の key_prefix もドメイン別に更新:
["knowledge", "embedding"]→["obsidian", "embedding"]knowledge_chat_audit→llm_chat_auditknowledge_events→pipeline_events
WS6: Vector config 更新
devstack/vector/vector.yaml にドメイン別ルーティング transforms を追加。
source: telemetry.> (wildcard)
|
v
route_by_domain (subject prefix で分岐)
|-- telemetry.llm.* → domain: "llm"
|-- telemetry.obsidian.* → domain: "obsidian"
|-- telemetry.mlflow.* → domain: "mlflow"
|
v
add_domain_label (.domain フィールド付与)
|
+-- sink: console
+-- sink: prometheus_exporter (ドメインラベル付き)
追加: MinIO 外部化
devstack 内の MinIO コンテナを既存の外部 MinIO (storage.home.arpa:9000) に置き換えた。自宅ネットワークで既に稼働中の MinIO には archives、artifacts、datasets、lightdash、models のバケットがある。
podman-compose.ymlから minio サービスと minio-data ボリュームを削除- dagster-user-code、dagster-daemon の S3 エンドポイントを
storage.home.arpaに更新 minio-initコンテナでagw-mlflowとagw-icebergバケットをagw-prefix で自動作成(既存バケットとの namespace 分離)- MLflow の artifact root を
s3://mlflow/からs3://agw-mlflow/に更新
注意事項
- obsidian domain の indexer は vault diff 検知が未実装。現状は全ファイル走査の
IndexAllのみ - NATS JetStream の旧 durable consumer 名が残骸として残る。nats-init での stream purge / 再作成が必要
- Dagster UI でのアセットグラフ動作確認が未実施
- obsidian domain の transport 層への接続は TODO のまま
- WS5 実装中にツールのコンテキストクリアが発生し
defs.pyとassets/__init__.pyの再読み込みが必要になったが、実装自体への影響はなし indexer.goの初回生成時にencodeJSONヘルパーが壊れたコードになっていた(var jsonMarshalをfmt.Sprintfにフォールバックする不自然なパターン)。json.Marshal直接呼び出しに修正済み
検証
- 各ワークストリームで
go build ./...を通過 - Dagster 側はインポート検証済み
grep "pipeline\.knowledge\|telemetry\.knowledge"でinternal/配下に旧トピック残留なし- 全コミットは
feature/redesin-devstackブランチ上
次のアクション
| 項目 | 優先度 |
|---|---|
| obsidian indexer の vault diff 検知実装 | 高 |
| nats-init での旧 stream purge / 再作成 | 高 |
| Dagster UI でのアセットグラフ動作確認 | 中 |
| obsidian domain の transport 層接続 | 中 |
| MLflow webhook のエンドツーエンドテスト | 中 |
コミット履歴
| コミット | 内容 |
|---|---|
8d3f399 | Nessie/Trino 削除、Iceberg JDBC catalog テーブル追加 |
86bb7f0 | NATS トピック pipeline.knowledge.* → pipeline.llm.* / pipeline.obsidian.* 移行 |
9f6371c | domain/obsidian 新規作成 (Service, VaultRepository, Indexer) |
95a6ae8 | MLflow webhook + pipeline エンドポイント追加 |
57ec661 | Dagster sensor/job/asset をドメイン別に再構成 (4 wildcard sensors) |
9186490 | Vector テレメトリルーティング更新 |
| — | devstack MinIO → storage.home.arpa 外部化 |
| — | minio-init で agw-mlflow / agw-iceberg バケット自動作成 |
