Dagster + NATS JetStream Event Pipeline: Implementation Deep Dive
Implementation details of the event consumption side where Dagster sensors pull subscribe from NATS JetStream and materialize assets from fire-and-forget events published by agent-gateway. Covers the evolution from GraphQL trigger bridge to native sensors, table-driven routing, PostgreSQL schema design, and idempotency guarantees.
Conclusion
Within the Go + NATS + Dagster AI orchestration platform, the Dagster + NATS JetStream event pipeline is the consumption side, fully decoupled from the gateway. This article covers its implementation in detail.
The core design principle: “the gateway only publishes, Dagster sensors pull subscribe independently.” The initial implementation used a standalone NATS Consumer Worker plus GraphQL API trigger bridge, which was later consolidated into Dagster native sensors. Table-driven topic-to-job mapping, idempotent PostgreSQL schemas, and correlation_id-based end-to-end tracing form the backbone of this pipeline.
Prerequisites
This article details a pipeline running on the platform described in the full Go-based architecture record.
- Messaging: NATS 2.11 (JetStream enabled)
- Orchestration: Dagster (Python) — daemon + sensor + asset materialization
- Data store: PostgreSQL 18 + pgvector (JIT enabled)
- Event publisher: agent-gateway (Go/Gin) — fire-and-forget publish
Why Event-Driven
Initially, the Knowledge Service wrote directly to PostgreSQL after generating chat responses. The following problems surfaced.
| Problem | Impact |
|---|---|
| Response latency | Post-processing time included in user-facing response |
| Scalability | Multiple post-processing steps executed sequentially |
| Observability | Difficult to identify which step failed and when |
| Reproducibility | RAG logic validation and improvement became hard |
The fix was to make the gateway publish events only, with Dagster consuming them independently. This reduced response latency to zero and made post-processing changes invisible to the gateway.
Consumer Architecture Evolution
Phase 1: NATS Consumer Worker + GraphQL Bridge
The initial implementation used a standalone Python process subscribing to NATS and calling Dagster’s GraphQL API to launch jobs.
agent-gateway → NATS pub → Consumer Worker → GraphQL mutation → Dagster job
async def message_handler(msg):
payload = json.loads(msg.data.decode())
submit_dagster_run(
job_name="knowledge_chat_persist",
payload=payload,
correlation_id=payload.get("correlation_id", "")
)
GraphQL mutation:
mutation LaunchRun($executionParams: LaunchRunExecutionParam!) {
launchRun(executionParams: $executionParams) {
__typename
... on LaunchRunSuccess {
run { runId }
}
... on RunConfigValidationInvalid {
errors { message reason }
}
}
}
Problems with this approach:
- Consumer Worker as a separate process added operational overhead
- GraphQL API became an intermediate failure point
- Dagster lineage tracking and consumer state were disconnected
Phase 2: Multi-Job Fan-Out + Table-Driven Design
As topic count grew, conditional routing logic became unwieldy. Table-driven design solved this.
TOPIC_JOB_MAP = {
"pipeline.knowledge.chat.persist": [
("knowledge_chat_persist", build_audit_run_config),
("knowledge_chat_pair_materialize", build_asset_run_config),
],
"pipeline.knowledge.embedding": [
("knowledge_embedding", build_event_run_config),
],
"pipeline.knowledge.retrieve": [
("knowledge_retrieve", build_event_run_config),
],
"pipeline.knowledge.compose": [
("knowledge_compose", build_event_run_config),
],
}
- Adding topics requires only a table entry — code complexity stays O(1)
- Configuration builders for each job remain independent
- Fan-out (one message to multiple jobs) expressed declaratively
Phase 3: Dagster Native Sensors (Current)
The GraphQL trigger bridge was eliminated in favor of Dagster’s native sensor mechanism.
agent-gateway → NATS JetStream pub (fire-and-forget)
↓
Dagster daemon → sensor (pull subscribe, durable consumer)
↓
RunRequest → direct job execution → asset materialization
Sensors pull subscribe directly from JetStream and issue RunRequests to execute jobs. The Consumer Worker and GraphQL API are no longer needed, reducing the component count.
Active sensors:
| Sensor | Topic | Job |
|---|---|---|
| nats_dagster_chat_persist | pipeline.knowledge.chat.persist | knowledge_chat_persist + knowledge_chat_pair_materialize |
| nats_dagster_embedding | pipeline.knowledge.embedding | knowledge_embedding |
| nats_dagster_retrieve | pipeline.knowledge.retrieve | knowledge_retrieve |
| nats_dagster_compose | pipeline.knowledge.compose | knowledge_compose |
| nats_dagster_flow_lineage | pipeline.knowledge.flow.lineage | knowledge_flow_lineage |
| nats_dagster_tool_call | pipeline.knowledge.tool_call | knowledge_tool_call |
The table-driven topic-to-job mapping concept carries directly into the sensor design.
NATS Message Schema
Chat Persistence Message
{
"pipeline_id": "knowledge.chat.persist",
"correlation_id": "unique-trace-id",
"idempotency_key": "idempotency-unique-id",
"model": "llm-model-name",
"prompt": "user-provided-prompt-text",
"response": "llm-generated-response-text",
"messages": [{"role": "user", "content": "..."}],
"usage": {"prompt_tokens": 100, "completion_tokens": 50},
"requested_at": 1697845200000,
"payload_json": "additional structured data"
}
Design Points
| Field | Purpose |
|---|---|
correlation_id | End-to-end tracing. Propagates from gateway through NATS, Dagster, PostgreSQL, to analytics queries |
idempotency_key | Prevents duplicate processing. Combined with PostgreSQL UNIQUE constraints |
payload_json | Extensibility. Stores additional metadata as JSONB |
JetStream Stream Configuration
| Stream | Subjects | Retention | Max Age | Storage |
|---|---|---|---|---|
| PIPELINE | pipeline.> | limits | 72h | file |
| TELEMETRY | telemetry.> | limits | 24h | file |
The PIPELINE stream is consumed by Dagster sensors. The TELEMETRY stream is consumed by Vector (Rust) and forwarded to Prometheus and Loki.
PostgreSQL Schema Design
knowledge_chat_audit Table
Audit log for chat persistence.
CREATE TABLE knowledge_chat_audit (
id BIGSERIAL PRIMARY KEY,
pipeline_id TEXT NOT NULL,
correlation_id TEXT,
idempotency_key TEXT,
model TEXT,
prompt TEXT,
response TEXT,
payload_json JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
knowledge_events Table (Unified)
Shared event table for multiple knowledge processing steps: embedding, retrieve, and compose.
CREATE TABLE knowledge_events (
id BIGSERIAL PRIMARY KEY,
event_id TEXT UNIQUE, -- SHA256(event_type:correlation_id)[:32]
event_type TEXT NOT NULL, -- 'embedding' | 'retrieve' | 'compose'
correlation_id TEXT NOT NULL,
pipeline_id TEXT NOT NULL,
llm_model TEXT,
payload_json JSONB DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_knowledge_events_type ON knowledge_events (event_type);
CREATE INDEX idx_knowledge_events_corr ON knowledge_events (correlation_id);
CREATE INDEX idx_knowledge_events_date ON knowledge_events (created_at::date);
Design decisions:
- event_id UNIQUE constraint — last line of defense for idempotency. Duplicate events are silently rejected
- event_type discriminator — unifying events in one table instead of fragmenting across many simplifies analytics queries
- Indexes — fast filtering on event_type and correlation_id, date-based aggregation on created_at
Idempotency and Error Handling
Idempotency Design
| Layer | Mechanism |
|---|---|
| NATS | JetStream durable consumer retains messages until ACKed |
| Dagster | Sensor validates message content before issuing RunRequest |
| PostgreSQL | UNIQUE constraints on idempotency_key / event_id reject duplicates |
All side effects go through UPSERTs (ON CONFLICT) or UNIQUE constraints.
Error Handling
- NATS message ACK is delayed until Dagster job launch succeeds (in Phase 1/2, until GraphQL call succeeds)
- Failed messages remain in JetStream’s redelivery queue
- Dagster job failures are tracked in both Dagit UI and slog logs
- correlation_id in all logs enables single-query tracing across gateway, NATS, Dagster, and PostgreSQL
Ordering Guarantees
Order across embedding, retrieve, and compose is not guaranteed at the NATS level. Dagster Asset dependencies enforce execution order where needed, and correlation_id enables post-hoc event correlation and validation.
Caveats
- Phase 1/2 GraphQL trigger bridge code remains as reference but is not used alongside the current sensor-based approach
- JetStream max-age (72h) means unconsumed messages are lost if Dagster daemon is down for more than 72 hours. Manual recovery is required in that scenario
- The
knowledge_events.event_idgeneration logic (SHA256(event_type:correlation_id)[:32]) needs care when the same correlation_id produces multiple events of the same type
Verification
- Dagster UI confirms asset materialization status: chat_pair, flow_lineage, tool_call showing as Materialized
- NATS monitoring (
:8222) tracks stream state and consumer lag - PostgreSQL query
SELECT * FROM knowledge_events WHERE correlation_id = '...'verifies full-path tracing - Idempotency test: publishing the same message multiple times produces no duplicate rows in PostgreSQL
Next Steps
Phase 1 (real-time pipeline) is currently running. Planned extensions:
- Automated prompt-response curation using Dagster Asset Materialization
- pg to Parquet to Iceberg batch conversion (Phase 2, devstack ready)
- Automated fine-tuning dataset generation pipeline
For the full roadmap, see the Go-based architecture record.
