Background

Integrating LLMs into business pipelines often requires more than simple Request-Response. Parallel multi-LLM inference, SSE instant feedback, idempotent retries, and traceability are all necessary.

A “thin” OpenAI-compatible proxy was built in Rust(axum), NATS Core handles event relay, and Dagster oneshot jobs execute heavy processing — forming an “AI Intelligence Factory.” This document also records the eventual full migration to Go.

Component Responsibilities

LayerTechnologyResponsibility
Entry PointRust/Go + axumOpenAI-compatible EP, trace_id assignment, SSE holding
MessagingNATS CorePub/Sub event relay (evt.chat.{trace_id})
Execution EngineDagster + systemdOneshot job execution (no resident process)
ObservabilityLoki + Grafana + Dagittrace_id-tagged logs, Lineage tracking
Cache/StoragePostgreSQL + QdrantIdempotency log + semantic cache

Communication Flow

  1. Client → Rust API: /v1/chat/completions (stream=true). Issues trace_id, starts SSE
  2. Rust → Dagster: Oneshot launch via systemd/Quadlet (dagster-<job>@{trace_id})
  3. Dagster (op/pipeline): Parallel local LLM execution. Publishes tokens/progress to evt.chat.{trace_id} via NATS. Sends finished once after artifact commit
  4. Rust API: subscribe("evt.chat.{trace_id}") on NATS, formats to OpenAI-compatible chunks, streams via SSE
  5. Storage: UPSERT to PG/Qdrant on stream completion

Event Schema

  {"type":"role","role":"assistant"}
{"type":"token","text":"...","task":"A"}
{"type":"tool_call","name":"search","arguments":"{...}"}
{"type":"usage","usage":{...}}
{"type":"finished","reason":"stop","winner":"llama3.1-8b"}
  

Rust API converts these to OpenAI-compatible data: {...}\n\n format for SSE.

Idempotency and Retry Design

  • req_id = sha256(model + messages + params). Absorbs duplicate request execution
  • PG schema: idempotency_log(key PK, status, result, updated_at) + completions_cache(req_id PK, model, content, usage, created_at)
  • All side effects absorbed via UPSERT/unique constraints
  • Network operations use exponential backoff + max retries
  • finished sent once only after artifact commit

OpenAI-Compatible Proxy Extensions

Standard OpenAI API extended with x_ fields:

  • x_route: direct (passthrough) / rag (knowledge search) / workflow (async job)
  • x_adapter: LoRA adapter specification
  • x_project: Project identifier
  • Transparent RAG: Context auto-injected from Qdrant/PG (invisible to client)

NATS Core → JetStream Migration Path

Start with NATS Core for minimal setup. Loss-tolerant + idempotent for robustness.

JetStream migration diff:

  • Add REQ stream: Rust → js.publish("req.chat", {...}), Dagster uses pull consumer with fetch → ack. Guaranteed acceptance + retry
  • EVT as push consumer: Assign deliver_subject, push delivery relayed directly to SSE

Quadlet/systemd Integration

  # [email protected] (Type=oneshot)
ContainerEnv=TRACE_ID=%i
AutoRemove=yes
Restart=on-failure
After=nats.service
  

Rust side launches with systemd-run --user -u dagster-debate@{trace_id}. loginctl enable-linger ksh3 keeps user systemd resident.

Directory Structure

  /dagster
  /systemd      # Quadlet templates
  /entrypoints  # run_xxx.py (receives TRACE_ID)
  /repo          # ops/jobs/assets/pipelines + definitions.py
  /instance      # Shared DAGSTER_HOME
  /fn            # Common functions (NATS publish, PG/Qdrant I/O)
  

Rust to Go Migration

Why Migrate

The Rust design was precise but writing cost was high for the heavy async patterns (SSE relay, NATS Pub/Sub, parallel search all running simultaneously).

  • Managing async contexts for SSE + NATS subscription + PG/Qdrant writes was verbose in Rust
  • goroutine + channel naturally fits this pattern
  • Runtime overhead difference is negligible for this use case

Design Principles Preserved in Go

  • OpenAI-compatible endpoints
  • NATS Pub/Sub event-driven architecture
  • Dagster oneshot job launching
  • trace_id on all logs
  • Idempotent design (PG UPSERT)

The Rust design documents remain valuable as “specifications.” The Go implementation is a “concise translation” of the Rust design.

Operational Knobs

  • Parallelism: Dagster Dynamic Mapping + run tags, or systemd concurrent Start limits
  • Timeouts: EP-wide / per-job Timeout settings
  • Cache threshold: Qdrant similarity >= 0.92 for “provisional response” control
  • Cancellation: Client disconnect publishes cancel.{trace_id} to NATS → ops early-exit

Observability

  • Logs: JSON with mandatory trace_id. Promtail → Loki → Grafana with {trace_id="..."} | json queries
  • Metrics: SSE latency, drop rate, oneshot success rate dashboarded
  • Dagit: Shared DAGSTER_HOME/Run Storage (Postgres recommended) for past Run/Lineage browsing

Addendum: Middleware Simplification (Qdrant Dependency Removal)

Subsequent optimization internalized the /v1/embeddings endpoint in Rust(axum) and eliminated the Qdrant dependency entirely.

Changes

  • Embedding API internalized: /v1/embeddings processed directly within the Rust(axum) server. ONNX Runtime handles local vector generation, removing all external service calls
  • PostgreSQL 18 extensions: Added jit (JIT compilation) and pgvector (vector search) extensions. Semantic cache and vector similarity search now handled by PG alone
  • Qdrant removed: With semantic cache functionality consolidated into PG + pgvector, the Qdrant container was decommissioned

Impact

BeforeAfter
PG + Qdrant (2 services)PG 18 + pgvector (1 service)
Embedding → external API or Qdrant built-inEmbedding → Rust(axum) + ONNX Runtime
Network hops: 3 (API→Qdrant→PG)Network hops: 1 (API→PG)

Local network middleware is now simpler, reducing operational and debugging surface area.