Rust + NATS + Dagster AI Factory: OpenAI Proxy, Idempotent Design, SSE Streaming, and Go Migration Record
Rust(axum) OpenAI-compatible proxy, NATS Core/JetStream event relay, Dagster oneshot job execution, PG idempotency design, Qdrant semantic cache, SSE streaming, Quadlet/systemd integration. Plus the full migration to Go. Complete AI factory architecture.
Background
Integrating LLMs into business pipelines often requires more than simple Request-Response. Parallel multi-LLM inference, SSE instant feedback, idempotent retries, and traceability are all necessary.
A “thin” OpenAI-compatible proxy was built in Rust(axum), NATS Core handles event relay, and Dagster oneshot jobs execute heavy processing — forming an “AI Intelligence Factory.” This document also records the eventual full migration to Go.
Component Responsibilities
| Layer | Technology | Responsibility |
|---|---|---|
| Entry Point | Rust/Go + axum | OpenAI-compatible EP, trace_id assignment, SSE holding |
| Messaging | NATS Core | Pub/Sub event relay (evt.chat.{trace_id}) |
| Execution Engine | Dagster + systemd | Oneshot job execution (no resident process) |
| Observability | Loki + Grafana + Dagit | trace_id-tagged logs, Lineage tracking |
| Cache/Storage | PostgreSQL + Qdrant | Idempotency log + semantic cache |
Communication Flow
- Client → Rust API:
/v1/chat/completions(stream=true). Issues trace_id, starts SSE - Rust → Dagster: Oneshot launch via systemd/Quadlet (
dagster-<job>@{trace_id}) - Dagster (op/pipeline): Parallel local LLM execution. Publishes tokens/progress to
evt.chat.{trace_id}via NATS. Sendsfinishedonce after artifact commit - Rust API:
subscribe("evt.chat.{trace_id}")on NATS, formats to OpenAI-compatible chunks, streams via SSE - Storage: UPSERT to PG/Qdrant on stream completion
Event Schema
{"type":"role","role":"assistant"}
{"type":"token","text":"...","task":"A"}
{"type":"tool_call","name":"search","arguments":"{...}"}
{"type":"usage","usage":{...}}
{"type":"finished","reason":"stop","winner":"llama3.1-8b"}
Rust API converts these to OpenAI-compatible data: {...}\n\n format for SSE.
Idempotency and Retry Design
- req_id = sha256(model + messages + params). Absorbs duplicate request execution
- PG schema:
idempotency_log(key PK, status, result, updated_at)+completions_cache(req_id PK, model, content, usage, created_at) - All side effects absorbed via UPSERT/unique constraints
- Network operations use exponential backoff + max retries
finishedsent once only after artifact commit
OpenAI-Compatible Proxy Extensions
Standard OpenAI API extended with x_ fields:
x_route: direct (passthrough) / rag (knowledge search) / workflow (async job)x_adapter: LoRA adapter specificationx_project: Project identifier- Transparent RAG: Context auto-injected from Qdrant/PG (invisible to client)
NATS Core → JetStream Migration Path
Start with NATS Core for minimal setup. Loss-tolerant + idempotent for robustness.
JetStream migration diff:
- Add REQ stream: Rust →
js.publish("req.chat", {...}), Dagster uses pull consumer with fetch → ack. Guaranteed acceptance + retry - EVT as push consumer: Assign deliver_subject, push delivery relayed directly to SSE
Quadlet/systemd Integration
# [email protected] (Type=oneshot)
ContainerEnv=TRACE_ID=%i
AutoRemove=yes
Restart=on-failure
After=nats.service
Rust side launches with systemd-run --user -u dagster-debate@{trace_id}. loginctl enable-linger ksh3 keeps user systemd resident.
Directory Structure
/dagster
/systemd # Quadlet templates
/entrypoints # run_xxx.py (receives TRACE_ID)
/repo # ops/jobs/assets/pipelines + definitions.py
/instance # Shared DAGSTER_HOME
/fn # Common functions (NATS publish, PG/Qdrant I/O)
Rust to Go Migration
Why Migrate
The Rust design was precise but writing cost was high for the heavy async patterns (SSE relay, NATS Pub/Sub, parallel search all running simultaneously).
- Managing async contexts for SSE + NATS subscription + PG/Qdrant writes was verbose in Rust
- goroutine + channel naturally fits this pattern
- Runtime overhead difference is negligible for this use case
Design Principles Preserved in Go
- OpenAI-compatible endpoints
- NATS Pub/Sub event-driven architecture
- Dagster oneshot job launching
- trace_id on all logs
- Idempotent design (PG UPSERT)
The Rust design documents remain valuable as “specifications.” The Go implementation is a “concise translation” of the Rust design.
Operational Knobs
- Parallelism: Dagster Dynamic Mapping + run tags, or systemd concurrent Start limits
- Timeouts: EP-wide / per-job Timeout settings
- Cache threshold: Qdrant similarity >= 0.92 for “provisional response” control
- Cancellation: Client disconnect publishes
cancel.{trace_id}to NATS → ops early-exit
Observability
- Logs: JSON with mandatory trace_id. Promtail → Loki → Grafana with
{trace_id="..."} | jsonqueries - Metrics: SSE latency, drop rate, oneshot success rate dashboarded
- Dagit: Shared DAGSTER_HOME/Run Storage (Postgres recommended) for past Run/Lineage browsing
Addendum: Middleware Simplification (Qdrant Dependency Removal)
Subsequent optimization internalized the /v1/embeddings endpoint in Rust(axum) and eliminated the Qdrant dependency entirely.
Changes
- Embedding API internalized:
/v1/embeddingsprocessed directly within the Rust(axum) server. ONNX Runtime handles local vector generation, removing all external service calls - PostgreSQL 18 extensions: Added
jit(JIT compilation) andpgvector(vector search) extensions. Semantic cache and vector similarity search now handled by PG alone - Qdrant removed: With semantic cache functionality consolidated into PG + pgvector, the Qdrant container was decommissioned
Impact
| Before | After |
|---|---|
| PG + Qdrant (2 services) | PG 18 + pgvector (1 service) |
| Embedding → external API or Qdrant built-in | Embedding → Rust(axum) + ONNX Runtime |
| Network hops: 3 (API→Qdrant→PG) | Network hops: 1 (API→PG) |
Local network middleware is now simpler, reducing operational and debugging surface area.

