Conclusion

Within the Go + NATS + Dagster AI orchestration platform, the Dagster + NATS JetStream event pipeline is the consumption side, fully decoupled from the gateway. This article covers its implementation in detail.

The core design principle: “the gateway only publishes, Dagster sensors pull subscribe independently.” The initial implementation used a standalone NATS Consumer Worker plus GraphQL API trigger bridge, which was later consolidated into Dagster native sensors. Table-driven topic-to-job mapping, idempotent PostgreSQL schemas, and correlation_id-based end-to-end tracing form the backbone of this pipeline.


Prerequisites

This article details a pipeline running on the platform described in the full Go-based architecture record.

  • Messaging: NATS 2.11 (JetStream enabled)
  • Orchestration: Dagster (Python) — daemon + sensor + asset materialization
  • Data store: PostgreSQL 18 + pgvector (JIT enabled)
  • Event publisher: agent-gateway (Go/Gin) — fire-and-forget publish

Why Event-Driven

Initially, the Knowledge Service wrote directly to PostgreSQL after generating chat responses. The following problems surfaced.

ProblemImpact
Response latencyPost-processing time included in user-facing response
ScalabilityMultiple post-processing steps executed sequentially
ObservabilityDifficult to identify which step failed and when
ReproducibilityRAG logic validation and improvement became hard

The fix was to make the gateway publish events only, with Dagster consuming them independently. This reduced response latency to zero and made post-processing changes invisible to the gateway.


Consumer Architecture Evolution

Phase 1: NATS Consumer Worker + GraphQL Bridge

The initial implementation used a standalone Python process subscribing to NATS and calling Dagster’s GraphQL API to launch jobs.

  agent-gateway → NATS pub → Consumer Worker → GraphQL mutation → Dagster job
  
  async def message_handler(msg):
    payload = json.loads(msg.data.decode())
    submit_dagster_run(
        job_name="knowledge_chat_persist",
        payload=payload,
        correlation_id=payload.get("correlation_id", "")
    )
  

GraphQL mutation:

  mutation LaunchRun($executionParams: LaunchRunExecutionParam!) {
  launchRun(executionParams: $executionParams) {
    __typename
    ... on LaunchRunSuccess {
      run { runId }
    }
    ... on RunConfigValidationInvalid {
      errors { message reason }
    }
  }
}
  

Problems with this approach:

  • Consumer Worker as a separate process added operational overhead
  • GraphQL API became an intermediate failure point
  • Dagster lineage tracking and consumer state were disconnected

Phase 2: Multi-Job Fan-Out + Table-Driven Design

As topic count grew, conditional routing logic became unwieldy. Table-driven design solved this.

  TOPIC_JOB_MAP = {
    "pipeline.knowledge.chat.persist": [
        ("knowledge_chat_persist", build_audit_run_config),
        ("knowledge_chat_pair_materialize", build_asset_run_config),
    ],
    "pipeline.knowledge.embedding": [
        ("knowledge_embedding", build_event_run_config),
    ],
    "pipeline.knowledge.retrieve": [
        ("knowledge_retrieve", build_event_run_config),
    ],
    "pipeline.knowledge.compose": [
        ("knowledge_compose", build_event_run_config),
    ],
}
  
  • Adding topics requires only a table entry — code complexity stays O(1)
  • Configuration builders for each job remain independent
  • Fan-out (one message to multiple jobs) expressed declaratively

Phase 3: Dagster Native Sensors (Current)

The GraphQL trigger bridge was eliminated in favor of Dagster’s native sensor mechanism.

  agent-gateway → NATS JetStream pub (fire-and-forget)
                    ↓
Dagster daemon → sensor (pull subscribe, durable consumer)
                    ↓
                RunRequest → direct job execution → asset materialization
  

Sensors pull subscribe directly from JetStream and issue RunRequests to execute jobs. The Consumer Worker and GraphQL API are no longer needed, reducing the component count.

Active sensors:

SensorTopicJob
nats_dagster_chat_persistpipeline.knowledge.chat.persistknowledge_chat_persist + knowledge_chat_pair_materialize
nats_dagster_embeddingpipeline.knowledge.embeddingknowledge_embedding
nats_dagster_retrievepipeline.knowledge.retrieveknowledge_retrieve
nats_dagster_composepipeline.knowledge.composeknowledge_compose
nats_dagster_flow_lineagepipeline.knowledge.flow.lineageknowledge_flow_lineage
nats_dagster_tool_callpipeline.knowledge.tool_callknowledge_tool_call

The table-driven topic-to-job mapping concept carries directly into the sensor design.


NATS Message Schema

Chat Persistence Message

  {
  "pipeline_id": "knowledge.chat.persist",
  "correlation_id": "unique-trace-id",
  "idempotency_key": "idempotency-unique-id",
  "model": "llm-model-name",
  "prompt": "user-provided-prompt-text",
  "response": "llm-generated-response-text",
  "messages": [{"role": "user", "content": "..."}],
  "usage": {"prompt_tokens": 100, "completion_tokens": 50},
  "requested_at": 1697845200000,
  "payload_json": "additional structured data"
}
  

Design Points

FieldPurpose
correlation_idEnd-to-end tracing. Propagates from gateway through NATS, Dagster, PostgreSQL, to analytics queries
idempotency_keyPrevents duplicate processing. Combined with PostgreSQL UNIQUE constraints
payload_jsonExtensibility. Stores additional metadata as JSONB

JetStream Stream Configuration

StreamSubjectsRetentionMax AgeStorage
PIPELINEpipeline.>limits72hfile
TELEMETRYtelemetry.>limits24hfile

The PIPELINE stream is consumed by Dagster sensors. The TELEMETRY stream is consumed by Vector (Rust) and forwarded to Prometheus and Loki.


PostgreSQL Schema Design

knowledge_chat_audit Table

Audit log for chat persistence.

  CREATE TABLE knowledge_chat_audit (
  id BIGSERIAL PRIMARY KEY,
  pipeline_id TEXT NOT NULL,
  correlation_id TEXT,
  idempotency_key TEXT,
  model TEXT,
  prompt TEXT,
  response TEXT,
  payload_json JSONB NOT NULL DEFAULT '{}',
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
  

knowledge_events Table (Unified)

Shared event table for multiple knowledge processing steps: embedding, retrieve, and compose.

  CREATE TABLE knowledge_events (
    id             BIGSERIAL PRIMARY KEY,
    event_id       TEXT UNIQUE,          -- SHA256(event_type:correlation_id)[:32]
    event_type     TEXT NOT NULL,        -- 'embedding' | 'retrieve' | 'compose'
    correlation_id TEXT NOT NULL,
    pipeline_id    TEXT NOT NULL,
    llm_model      TEXT,
    payload_json   JSONB DEFAULT '{}'::jsonb,
    created_at     TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_knowledge_events_type ON knowledge_events (event_type);
CREATE INDEX idx_knowledge_events_corr ON knowledge_events (correlation_id);
CREATE INDEX idx_knowledge_events_date ON knowledge_events (created_at::date);
  

Design decisions:

  • event_id UNIQUE constraint — last line of defense for idempotency. Duplicate events are silently rejected
  • event_type discriminator — unifying events in one table instead of fragmenting across many simplifies analytics queries
  • Indexes — fast filtering on event_type and correlation_id, date-based aggregation on created_at

Idempotency and Error Handling

Idempotency Design

LayerMechanism
NATSJetStream durable consumer retains messages until ACKed
DagsterSensor validates message content before issuing RunRequest
PostgreSQLUNIQUE constraints on idempotency_key / event_id reject duplicates

All side effects go through UPSERTs (ON CONFLICT) or UNIQUE constraints.

Error Handling

  • NATS message ACK is delayed until Dagster job launch succeeds (in Phase 1/2, until GraphQL call succeeds)
  • Failed messages remain in JetStream’s redelivery queue
  • Dagster job failures are tracked in both Dagit UI and slog logs
  • correlation_id in all logs enables single-query tracing across gateway, NATS, Dagster, and PostgreSQL

Ordering Guarantees

Order across embedding, retrieve, and compose is not guaranteed at the NATS level. Dagster Asset dependencies enforce execution order where needed, and correlation_id enables post-hoc event correlation and validation.


Caveats

  • Phase 1/2 GraphQL trigger bridge code remains as reference but is not used alongside the current sensor-based approach
  • JetStream max-age (72h) means unconsumed messages are lost if Dagster daemon is down for more than 72 hours. Manual recovery is required in that scenario
  • The knowledge_events.event_id generation logic (SHA256(event_type:correlation_id)[:32]) needs care when the same correlation_id produces multiple events of the same type

Verification

  • Dagster UI confirms asset materialization status: chat_pair, flow_lineage, tool_call showing as Materialized
  • NATS monitoring (:8222) tracks stream state and consumer lag
  • PostgreSQL query SELECT * FROM knowledge_events WHERE correlation_id = '...' verifies full-path tracing
  • Idempotency test: publishing the same message multiple times produces no duplicate rows in PostgreSQL

Next Steps

Phase 1 (real-time pipeline) is currently running. Planned extensions:

  • Automated prompt-response curation using Dagster Asset Materialization
  • pg to Parquet to Iceberg batch conversion (Phase 2, devstack ready)
  • Automated fine-tuning dataset generation pipeline

For the full roadmap, see the Go-based architecture record.