Conclusion

The agent-gateway pipeline architecture was redesigned as v3. The core change: splitting the monolithic pipeline.knowledge.* domain into three — llm, obsidian, and mlflow. NATS subjects, Dagster sensors/jobs/assets, and Vector telemetry routing were all aligned to this three-domain taxonomy.

This was executed during the development phase with no production data, so backward compatibility was deliberately ignored. Old topics and old durable consumers were simply deleted. 6 workstreams + MinIO externalization, 8 commits total. Go side passed go build ./... at each workstream checkpoint; Dagster side was import-verified.

The motivation was clear: when adding Obsidian vault integration and MLflow experiment management, the “knowledge” label made domain boundaries ambiguous. Dagster sensor routing and telemetry classification could no longer determine what any given event was for just from its name.

This article is a sequel to Go + NATS + Dagster AI Orchestration Platform.


Prerequisites

  • Target system: agent-gateway — a custom LLM orchestration platform built with Go + Python
  • Runtime: EPYC 9175F (768GB DDR5, RTX 6000 Pro Max-Q), Podman rootless
  • Messaging: NATS 2.11 (JetStream)
  • Orchestration: Dagster (Python)
  • Telemetry: Vector (Rust) → Prometheus / Loki
  • New integrations: Obsidian vault, MLflow Model Registry
  • Design document: docs/2nd-design-archi.md

Design Decisions

Domain Split Strategy

The old design consolidated all pipelines under pipeline.knowledge.*. Chat persist, embedding, and tool call all lived under the same “knowledge” label, causing the following problems:

  • When adding Obsidian vault operations, pipeline.knowledge.embedding was indistinguishable — chat embedding or vault indexing?
  • Placing MLflow experiment management events under the knowledge domain was semantically broken
  • Dagster sensor routing logic could not infer intent from subject names
  • Telemetry classification no longer mapped to functional boundaries

The three domains after the split:

DomainResponsibilityNATS subject prefix
llmChat persistence, tool calls, flow lineagepipeline.llm.*
obsidianVault operations, semantic search, indexingpipeline.obsidian.*
mlflowExperiment run lifecycle, model registry webhookspipeline.mlflow.*

Telemetry follows the same split: telemetry.llm.* / telemetry.obsidian.* / telemetry.mlflow.*.

No Backward Compatibility

With no production data in the development phase, no migration logic was written. Old topics and old durable consumers were simply deleted. This decision substantially reduced implementation cost.

Workstream Structure

Six workstreams ordered by dependency, each committed as a single checkpoint.

  WS1 (Lakehouse simplification) → WS2 (NATS topic migration) → WS3 + WS4 + WS5 (parallel) → WS6 (Vector config)
  

WS3 (Go domain layer) / WS4 (Go transport layer) / WS5 (Python Dagster) operate on different layers and could proceed in parallel.


Implementation

WS1: Lakehouse Simplification

The Nessie + Trino + dbt-fusion stack was introduced during a previous ELT optimization evaluation. At this point, PyIceberg + JDBC catalog is sufficient, so the stack was removed to reduce container count.

Changes:

  • Removed nessie, trino, dbt-fusion services and trino-warehouse volume from podman-compose.yml
  • Removed TrinoBaseURL, NessieBaseURL fields and env var loading from internal/config/config.go
  • Added Iceberg JDBC catalog metadata tables (iceberg_tables, iceberg_namespace_properties) to devstack/postgres/init.sql
  • Deleted devstack/trino/ directory entirely

WS2: NATS Topic Migration

All topic constants under pipeline.knowledge.* were batch-replaced with the new domain-specific namespaces. Changes spanned 5 files.

Old topicNew topic
pipeline.knowledge.chat.persistpipeline.llm.chat.persist
pipeline.knowledge.embeddingpipeline.obsidian.semantic_search
pipeline.knowledge.retrievepipeline.llm.retrieve
pipeline.knowledge.composepipeline.llm.compose
pipeline.knowledge.flow.lineagepipeline.llm.flow.lineage
pipeline.knowledge.tool_callpipeline.llm.tool_call

Telemetry topics (telemetry.knowledge.*telemetry.llm.* / telemetry.obsidian.*) and DAG names (knowledge.prompt_flowllm.prompt_flow) were updated simultaneously.

Verified with grep "pipeline\.knowledge\|telemetry\.knowledge" that no old topics remained under internal/.

WS3: New domain/obsidian Package

Three files created under internal/domain/obsidian/.

repository.goVaultRepository interface and local filesystem implementation LocalVault. A resolve() method prevents path traversal by rejecting access outside the vault root.

service.goService struct with 5 methods:

MethodResponsibility
SemanticSearchSemantic search within the vault
ReadNoteNote read
WriteNoteNote write
ReviseNoteNote revision
ProofreadProofreading

LLM calls go through the knowledge orchestrator, not directly — the obsidian domain focuses on vault I/O and embedding. Each operation publishes to pipeline.obsidian.* and sends telemetry to telemetry.obsidian.*.

indexer.go — Vault scan → chunk split → embedding → pgvector upsert pipeline.

  vault files → scan → chunk (512 rune, 64 overlap) → embed → pgvector upsert
  

Added ObsidianVaultDir field and OBSIDIAN_VAULT_DIR env var to internal/config/config.go. Wired obsidian.NewService initialization in cmd/server/main.go (transport layer connection not yet implemented).

WS4: MLflow Webhook Endpoints

Two new handlers for MLflow integration in the Go transport layer.

webhook_mlflow.goPOST /v1/webhook/mlflow

Receives MLflow Model Registry webhook requests, verifies HMAC-SHA256 via X-Webhook-Signature header, and publishes to NATS based on event type.

Event typeNATS subject
model registeredpipeline.mlflow.model.registered
version createdpipeline.mlflow.model.version_created
alias setpipeline.mlflow.model.alias_set
tag setpipeline.mlflow.model.tag_set

pipeline_mlflow.go — 2 endpoints:

EndpointNATS subject
POST /v1/pipeline/mlflow/runpipeline.mlflow.run.{started,completed,failed}
POST /v1/pipeline/mlflow/metricspipeline.mlflow.run.metrics

Added WithPipelineService ServerOption to server.go, following the existing WithConversationRepository DI pattern. Added MLflowWebhookSecret (MLFLOW_WEBHOOK_SECRET env var) to config.go.

WS5: Dagster Sensor/Job/Asset Restructuring

Largest changeset: 16 files, +401 lines / -229 lines.

Replaced the existing 6 per-topic sensors with 4 wildcard sensors.

Old sensorNew sensorSubject
nats_dagster_chat_persistnats_dagster_llmpipeline.llm.>
nats_dagster_embeddingnats_dagster_obsidianpipeline.obsidian.>
nats_dagster_retrievenats_dagster_mlflow_runpipeline.mlflow.run.>
nats_dagster_composenats_dagster_mlflow_modelpipeline.mlflow.model.>
nats_dagster_flow_lineage(consolidated)
nats_dagster_tool_call(consolidated)

Each sensor subscribes on a wildcard subject and routes to jobs based on the received message’s subject suffix.

Deleted old job files (chat_persist.py, chat_pair_materialize.py, knowledge_events.py, tool_calls.py) and created domain-specific llm_jobs.py, obsidian_jobs.py, mlflow_jobs.py.

Asset key_prefix updated to domain-specific values:

  • ["knowledge", "embedding"]["obsidian", "embedding"]
  • knowledge_chat_auditllm_chat_audit
  • knowledge_eventspipeline_events

WS6: Vector Config Update

Added domain-based routing transforms to devstack/vector/vector.yaml.

  source: telemetry.> (wildcard)
    |
    v
route_by_domain (branch by subject prefix)
    |-- telemetry.llm.*     → domain: "llm"
    |-- telemetry.obsidian.* → domain: "obsidian"
    |-- telemetry.mlflow.*  → domain: "mlflow"
    |
    v
add_domain_label (.domain field injection)
    |
    +-- sink: console
    +-- sink: prometheus_exporter (with domain label)
  

Additional: MinIO Externalization

Replaced the devstack MinIO container with the existing external MinIO at storage.home.arpa:9000. The home network MinIO already hosts archives, artifacts, datasets, lightdash, and models buckets.

  • Removed minio service and minio-data volume from podman-compose.yml
  • Updated dagster-user-code and dagster-daemon S3 endpoints to storage.home.arpa
  • Added minio-init container to auto-create agw-mlflow and agw-iceberg buckets with agw- prefix (namespace isolation from existing buckets)
  • Updated MLflow artifact root from s3://mlflow/ to s3://agw-mlflow/

Caveats

  • The obsidian domain indexer lacks vault diff detection. Currently only full-scan IndexAll is implemented
  • Old NATS JetStream durable consumer names remain as residue. Stream purge/recreation via nats-init is needed
  • Dagster UI asset graph behavior has not been verified
  • The obsidian domain’s transport layer connection remains a TODO
  • During WS5 implementation, tool context was cleared requiring re-reads of defs.py and assets/__init__.py, but this did not affect the implementation itself
  • The initial generation of indexer.go produced broken encodeJSON helper code (an unnatural pattern attempting to fallback var jsonMarshal to fmt.Sprintf). Fixed to call json.Marshal directly

Verification

  • go build ./... passed at each workstream
  • Dagster imports verified
  • grep "pipeline\.knowledge\|telemetry\.knowledge" confirmed no old topic residue under internal/
  • All commits on the feature/redesin-devstack branch

Next Actions

ItemPriority
Implement vault diff detection in obsidian indexerHigh
Old stream purge/recreation via nats-initHigh
Verify asset graph in Dagster UIMedium
Connect obsidian domain to transport layerMedium
End-to-end test for MLflow webhooksMedium

Commit History

CommitDescription
8d3f399Remove Nessie/Trino, add Iceberg JDBC catalog tables
86bb7f0Migrate NATS topics pipeline.knowledge.*pipeline.llm.* / pipeline.obsidian.*
9f6371cCreate domain/obsidian (Service, VaultRepository, Indexer)
95a6ae8Add MLflow webhook + pipeline endpoints
57ec661Restructure Dagster sensors/jobs/assets by domain (4 wildcard sensors)
9186490Update Vector telemetry routing
Externalize devstack MinIO → storage.home.arpa
Auto-create agw-mlflow / agw-iceberg buckets via minio-init