Designing and Implementing a Dagster Conversation Lineage, Evaluation, and Dataset Generation System
A design record for building three asset groups on top of the existing agent-gateway Dagster pipeline—Conversation Lineage, Evaluation, and Training Dataset Generation—treating conversations as replayable branching graphs. Covers the three-tier storage strategy, four critical design problems discovered during code review, and a subsequent architectural pivot.
Conclusion
agent-gateway already had a working event pipeline implementation built on Go + NATS + Dagster. On top of that foundation, I designed and implemented three asset groups to treat LLM conversations as replayable branching graphs.
The core of the design is separation of concerns. By treating PostgreSQL as the truth source for lineage data required for replay, DuckDB as the analytics workbench for evaluation and clustering, and Iceberg / Parquet as immutable frozen dataset snapshots, each layer’s design decisions remain independent.
During the implementation phase, code review revealed four critical design problems—grouping breakdown, missing metadata, turn_index collision, and unimplemented core assets—each requiring fixes.
Background
State of the Existing Pipeline
agent-gateway is an LLM infrastructure built around an OpenAI-compatible proxy gateway in Go + Gin, integrating NATS JetStream event streaming, Dagster pipeline processing, PostgreSQL + pgvector for vector search, and DuckDB as an analytics workbench.
The existing pipeline_runtime asset group had 11 Dagster assets running. A NATS sensor picked up events and a processing flow was established to write from pipeline_event_inbox into analytics tables like llm_chat_turns, flow_nodes, and flow_edges.
What Was Missing
The existing pipeline covered only normalization and accumulation of individual request events. The following were absent:
- A mechanism to treat entire conversations as branchable graphs
- Trajectory evaluation
- Automated training dataset generation
To build an LLM data stack for fine-tuning local OSS LLMs, model evaluation, and system prompt optimization, these needed to be designed and implemented on top of Dagster.
Designing the Three Asset Groups
Conversation Lineage Assets
The asset group representing the conversation tree itself.
| Asset | Role |
|---|---|
seed_themes | Reference table of experiment themes |
conversation_sessions | Session management |
conversation_branch_nodes | Holds all nodes in the conversation tree |
taskification_results | Intermediate results from MCP and taskification |
execution_run_records | Execution facts |
response_branch_records | Final responses and traces |
Each node holds a branch_kind (root, retry, tool_variant, sampling_variant, prompt_variant, taskify_variant) and variant_config as JSON.
The dependency graph is a serial chain:
seed_themes
→ conversation_sessions
→ conversation_branch_nodes
→ taskification_results
→ execution_run_records
→ response_branch_records
Replay Rules
- When
taskify_rule_versionchanges: regenerate fromtaskification_resultsonward - When
model_snapshot_idchanges: regenerate fromexecution_run_recordsonward - When a specific node’s conditions change: regenerate only that subtree
Evaluation and Distribution Labeling Assets
The asset group that converts conversation trajectories into evaluable units.
trajectory_units # Normalize paths from root to terminal node
→ trajectory_embeddings # Vectorize
→ trajectory_clusters # Cluster
→ cluster_category_labels # Assign category labels
→ human_sample_queue # Human review queue
→ human_eval_labels # Accumulate evaluation results
→ candidate_preference_pairs
→ preference_labels # Build DPO pairs
Example category labels defined: tool_helped, tool_misuse, hallucination, good_recovery, overlong, underspecified, stable_high_quality.
Design Principles
- Embeddings are representations, not labels
- Clusters are structure, not judgments
- Model-based categorization and human preference are explicitly separated into distinct layers
Training Dataset Generation Assets
The asset group that builds training datasets from evaluated trajectories.
positive_topk_trajectories # Stratified top-K extraction
negative_bottomk_trajectories # Stratified bottom-K extraction
→ sft_messages_dataset # SFT dataset
→ dpo_preference_dataset # DPO dataset
→ suppression_dataset # Suppression dataset
→ unlearning_dataset # Unlearning dataset
→ dataset_snapshot_registry # Metadata for frozen snapshots
Negative types defined: hard_negative, tool_misuse, hallucination, noise_reject, suppression_candidate. Rather than treating all negatives uniformly for unlearning, they are separated by type.
Key Design Decisions
Active Orchestrator vs. Passive Capture
An early question was whether seed_themes → conversation_sessions → branch_nodes would actively drive LLM conversations programmatically, or passively organize existing ones.
The conclusion: first, manually define themes and issue session IDs, then iterate LLM interactions through improvements and failures. Then materialize conversation assets from those session IDs. “Replay from an arbitrary node” means a replay substrate first, with an active orchestrator built on top later.
Where to Place Branching Logic
Rather than discarding the existing session / session_turn / session_lineage tables, I decided to use them as an execution log substrate, building the experimental semantic layer as assets on top.
The existing parent-child turn structure needed these additional fields:
branch_kindvariant_confignode_statuscontext_snapshot_idtaskification replay point
taskification_results is distinct from the existing FlowLineageEvent and is kept independent as an intermediate representation for prompt decomposition and orchestration plans.
Three-Tier Storage Structure
| Layer | Storage | Purpose |
|---|---|---|
| Truth source | PostgreSQL | Lineage data required for replay |
| Analytics workbench | DuckDB | Derived data for evaluation and clustering |
| Immutable artifact | Iceberg / Parquet | Frozen dataset snapshots |
This separation means changing the lineage storage method does not affect the evaluation pipeline, and changing the dataset format does not ripple back to lineage.
Implementation Priority
Rather than proceeding group by group, I broke down the actual order of work into five stages:
- Establish a replayable lineage model
- Introduce taskification and branch generation
- Trajectory normalization
- Embedding / clustering / labeling
- Dataset generation and snapshot freeze
The highest priority is replay using local OSS LLMs and normalizing branch nodes. Without this foundation, both the evaluation assets and dataset assets downstream become unstable.
Implementation Plan
Directory Structure
New code is added under the existing model-foundry/src/loftllc/ package as a module parallel to pipeline_runtime, following the existing domain / infra / presentation layering:
loftllc/
domain/
conversation_lineage.py
infra/
lineage_store.py
presentation/
dagster/
assets/
conversation_lineage.py
Difference in Asset Execution Patterns
The existing pipeline_runtime assets use single-event-driven execution (one RunRequest per NATS message), but lineage assets are fundamentally different:
- NATS events are already processed by
pipeline_runtimeand accumulated inllm_chat_turnsandflow_nodes - Lineage assets operate as downstream consumers reading from those tables
- Session-accumulated data is materialized in batch
Materialization strategy:
| Group | Trigger |
|---|---|
| Group 1 (Conversation Lineage) | Manual or 60-second interval sensor |
| Group 2 (Evaluation) | Auto-triggered via dependency on Group 1 |
| Group 3 (Dataset Generation) | Fully manual |
Partition Strategy
DynamicPartitionsDefinition is used. Session IDs are generated at runtime by the Go gateway and cannot be pre-declared. The sensor discovers new session IDs from llm_chat_turns, adds them to the dynamic partition set, and then issues a RunRequest. Group 3 uses a separate dynamic partition keyed by snapshot ID.
Replay Mechanism
“Replay from node X” means:
- Retrieve node X’s conversation state (frozen message history)
- Send that message history to one or more LLMs
- Record new responses as
response_branch_records
A new infra resource LLMClientResource is added, sending HTTP requests to the gateway’s /v1/chat/completions endpoint. This routes replay through the gateway as well, recording the same routing, logging, and NATS events.
PostgreSQL Schema Design
Lineage tables defined:
| Table | Role |
|---|---|
seed_themes | Experiment theme reference table |
conversation_sessions | Session view aggregated from llm_chat_turns |
conversation_branch_nodes | Branch points in the conversation (branch_node_id, parent_node_id, turn_index, message_history_json, branch_reason) |
taskification_results | Decomposed sub-prompts |
execution_run_records | LLM execution facts (original or replay) |
response_branch_records | Comparative records for the same prompt (branching origin tracked via context_anchor_turn_id) |
lineage_watermark | Watermark for incremental processing |
Design Problems Found in Code Review
After implementing Phase 1 (conversation lineage foundation), code review revealed four critical problems.
Problem 1: Grouping Breakdown in response_branch_records
A new turn_id was assigned per replay, but branch grouping was done per turn_id, making each replay a separate group and leaving branch_index effectively at 0. This was inconsistent with the spec’s “shared parent / controlled branches” comparison model, making side-by-side comparison and preference pair construction impossible.
Fix: Switch grouping to be based on context_anchor_turn_id.
Problem 2: Missing Metadata in original turn execution_run_records
Only role / content / created_at were fetched from chat_history, but the builder assumed model / backend / assistant_message / token_usage / duration_ms would be present. As a result, response_text and usage for original runs were empty.
The actual metadata existed in llm_chat_turns (DuckDB side) or pipeline_event_inbox (PostgreSQL side), requiring a fix to fetch from the correct source.
Problem 3: turn_index Collision in Replay Branches
Replay branches had turn_index = parent_turn_index + 1 hardcoded, colliding with the existing next turn. Since reads used only ORDER BY turn_index ASC, ordering between sibling branches and existing serial turns became unstable, breaking the premise that “any node can be a replay root.”
Fix: Abandon the turn_index-based serial model and switch to a parent-child tree structure via the branch node table.
Problem 4: Unimplemented Core Assets
Materialization for conversation_sessions and conversation_branch_nodes was missing, meaning depth / branch_kind / context_snapshot_id / variant_config / node_status from the spec were never stored, leaving the system insufficient as a lineage-aware branching graph.
Remaining Issues After Initial Fix
Two points remained unresolved after the first round of fixes:
- Whether to exclude replay nodes during normal materialization, or to skip turns with no original source in
build_execution_record_from_event() conversation_branch_nodesrequires 2-pass construction for parent references
Final fix approach:
- Include topology columns (
parent_node_id,depth,branch_kind) inupsert_branch_node()conflict updates - Populate
status / error / tool_tracefrompipeline_eventfor original runs as well
The Core Design Principle
In one sentence, this system is a foundation for treating LLM conversations as replayable branching graphs.
The existing session / session_turn / session_lineage serve as the execution log substrate, with five experimental asset tables built on top. The goal is to perform controlled replay using any node as a context anchor, collect and evaluate response distributions, and ultimately connect to training datasets.
taskification_results is distinct from existing flow lineage and stands alone as an intermediate representation for decomposing prompts into subtask orchestration plans.
The three-tier storage separation (PostgreSQL / DuckDB / Iceberg) keeps each layer’s design decisions independent.
Results
- Designed three asset groups—Conversation Lineage, Evaluation & Distribution Labeling, Training Dataset Generation—adding approximately 30 assets in total
- Finalized a three-tier storage strategy: PostgreSQL / DuckDB / Iceberg
- Completed Phase 1 (conversation lineage foundation) implementation
- Discovered and fixed four critical design problems through code review
- Clarified remaining issues: branch node 2-pass construction, context_anchor-based grouping, replay node exclusion logic
- Established implementation priority as five stages: replay foundation → taskification → trajectory normalization → evaluation → dataset generation
Subsequent Pivot
During implementation and operation of this design, the direction shifted in several ways.
Adding Telemetry and Pivoting to a Visualization Pipeline
One shortcoming of expressing conversation lineage through Dagster’s asset graph was that it was hard for humans to grasp the real-time state. I added a telemetry layer and settled on routing metrics through Vector → Prometheus → Grafana. Drawing conversation state, branching, and evaluation results as Grafana dashboards made it possible to observe LLM-to-LLM interactions in real time in a human-readable form, without them becoming a black box.
Flattening Dagster Assets
With visualization moved to the telemetry side, there was no longer a strong reason to maintain complex Dagster asset structures. Rather than expressing the conversation tree as a deep dependency graph, I pivoted to flattening the assets.
Redesigning agent-gateway as an Orchestrator Model
A larger change was overhauling the agent-gateway architecture itself. The structure shifted from a single gateway processing all requests to a system where an orchestrator model drives a set of subordinate worker LLM models. With LLMs interacting with each other to process tasks, materializing those interaction flows as assets and visualizing them in Grafana in real time became important.
The details of this redesign are planned for a separate write-up.
