Conclusion

agent-gateway already had a working event pipeline implementation built on Go + NATS + Dagster. On top of that foundation, I designed and implemented three asset groups to treat LLM conversations as replayable branching graphs.

The core of the design is separation of concerns. By treating PostgreSQL as the truth source for lineage data required for replay, DuckDB as the analytics workbench for evaluation and clustering, and Iceberg / Parquet as immutable frozen dataset snapshots, each layer’s design decisions remain independent.

During the implementation phase, code review revealed four critical design problems—grouping breakdown, missing metadata, turn_index collision, and unimplemented core assets—each requiring fixes.


Background

State of the Existing Pipeline

agent-gateway is an LLM infrastructure built around an OpenAI-compatible proxy gateway in Go + Gin, integrating NATS JetStream event streaming, Dagster pipeline processing, PostgreSQL + pgvector for vector search, and DuckDB as an analytics workbench.

The existing pipeline_runtime asset group had 11 Dagster assets running. A NATS sensor picked up events and a processing flow was established to write from pipeline_event_inbox into analytics tables like llm_chat_turns, flow_nodes, and flow_edges.

What Was Missing

The existing pipeline covered only normalization and accumulation of individual request events. The following were absent:

  • A mechanism to treat entire conversations as branchable graphs
  • Trajectory evaluation
  • Automated training dataset generation

To build an LLM data stack for fine-tuning local OSS LLMs, model evaluation, and system prompt optimization, these needed to be designed and implemented on top of Dagster.


Designing the Three Asset Groups

Conversation Lineage Assets

The asset group representing the conversation tree itself.

AssetRole
seed_themesReference table of experiment themes
conversation_sessionsSession management
conversation_branch_nodesHolds all nodes in the conversation tree
taskification_resultsIntermediate results from MCP and taskification
execution_run_recordsExecution facts
response_branch_recordsFinal responses and traces

Each node holds a branch_kind (root, retry, tool_variant, sampling_variant, prompt_variant, taskify_variant) and variant_config as JSON.

The dependency graph is a serial chain:

  seed_themes
  → conversation_sessions
    → conversation_branch_nodes
      → taskification_results
        → execution_run_records
          → response_branch_records
  

Replay Rules

  • When taskify_rule_version changes: regenerate from taskification_results onward
  • When model_snapshot_id changes: regenerate from execution_run_records onward
  • When a specific node’s conditions change: regenerate only that subtree

Evaluation and Distribution Labeling Assets

The asset group that converts conversation trajectories into evaluable units.

  trajectory_units          # Normalize paths from root to terminal node
  → trajectory_embeddings # Vectorize
    → trajectory_clusters # Cluster
      → cluster_category_labels  # Assign category labels
        → human_sample_queue     # Human review queue
          → human_eval_labels    # Accumulate evaluation results
            → candidate_preference_pairs
              → preference_labels  # Build DPO pairs
  

Example category labels defined: tool_helped, tool_misuse, hallucination, good_recovery, overlong, underspecified, stable_high_quality.

Design Principles

  • Embeddings are representations, not labels
  • Clusters are structure, not judgments
  • Model-based categorization and human preference are explicitly separated into distinct layers

Training Dataset Generation Assets

The asset group that builds training datasets from evaluated trajectories.

  positive_topk_trajectories     # Stratified top-K extraction
negative_bottomk_trajectories  # Stratified bottom-K extraction
  → sft_messages_dataset       # SFT dataset
  → dpo_preference_dataset     # DPO dataset
  → suppression_dataset        # Suppression dataset
  → unlearning_dataset         # Unlearning dataset
    → dataset_snapshot_registry  # Metadata for frozen snapshots
  

Negative types defined: hard_negative, tool_misuse, hallucination, noise_reject, suppression_candidate. Rather than treating all negatives uniformly for unlearning, they are separated by type.


Key Design Decisions

Active Orchestrator vs. Passive Capture

An early question was whether seed_themes → conversation_sessions → branch_nodes would actively drive LLM conversations programmatically, or passively organize existing ones.

The conclusion: first, manually define themes and issue session IDs, then iterate LLM interactions through improvements and failures. Then materialize conversation assets from those session IDs. “Replay from an arbitrary node” means a replay substrate first, with an active orchestrator built on top later.

Where to Place Branching Logic

Rather than discarding the existing session / session_turn / session_lineage tables, I decided to use them as an execution log substrate, building the experimental semantic layer as assets on top.

The existing parent-child turn structure needed these additional fields:

  • branch_kind
  • variant_config
  • node_status
  • context_snapshot_id
  • taskification replay point

taskification_results is distinct from the existing FlowLineageEvent and is kept independent as an intermediate representation for prompt decomposition and orchestration plans.

Three-Tier Storage Structure

LayerStoragePurpose
Truth sourcePostgreSQLLineage data required for replay
Analytics workbenchDuckDBDerived data for evaluation and clustering
Immutable artifactIceberg / ParquetFrozen dataset snapshots

This separation means changing the lineage storage method does not affect the evaluation pipeline, and changing the dataset format does not ripple back to lineage.

Implementation Priority

Rather than proceeding group by group, I broke down the actual order of work into five stages:

  1. Establish a replayable lineage model
  2. Introduce taskification and branch generation
  3. Trajectory normalization
  4. Embedding / clustering / labeling
  5. Dataset generation and snapshot freeze

The highest priority is replay using local OSS LLMs and normalizing branch nodes. Without this foundation, both the evaluation assets and dataset assets downstream become unstable.


Implementation Plan

Directory Structure

New code is added under the existing model-foundry/src/loftllc/ package as a module parallel to pipeline_runtime, following the existing domain / infra / presentation layering:

  loftllc/
  domain/
    conversation_lineage.py
  infra/
    lineage_store.py
  presentation/
    dagster/
      assets/
        conversation_lineage.py
  

Difference in Asset Execution Patterns

The existing pipeline_runtime assets use single-event-driven execution (one RunRequest per NATS message), but lineage assets are fundamentally different:

  • NATS events are already processed by pipeline_runtime and accumulated in llm_chat_turns and flow_nodes
  • Lineage assets operate as downstream consumers reading from those tables
  • Session-accumulated data is materialized in batch

Materialization strategy:

GroupTrigger
Group 1 (Conversation Lineage)Manual or 60-second interval sensor
Group 2 (Evaluation)Auto-triggered via dependency on Group 1
Group 3 (Dataset Generation)Fully manual

Partition Strategy

DynamicPartitionsDefinition is used. Session IDs are generated at runtime by the Go gateway and cannot be pre-declared. The sensor discovers new session IDs from llm_chat_turns, adds them to the dynamic partition set, and then issues a RunRequest. Group 3 uses a separate dynamic partition keyed by snapshot ID.

Replay Mechanism

“Replay from node X” means:

  1. Retrieve node X’s conversation state (frozen message history)
  2. Send that message history to one or more LLMs
  3. Record new responses as response_branch_records

A new infra resource LLMClientResource is added, sending HTTP requests to the gateway’s /v1/chat/completions endpoint. This routes replay through the gateway as well, recording the same routing, logging, and NATS events.

PostgreSQL Schema Design

Lineage tables defined:

TableRole
seed_themesExperiment theme reference table
conversation_sessionsSession view aggregated from llm_chat_turns
conversation_branch_nodesBranch points in the conversation (branch_node_id, parent_node_id, turn_index, message_history_json, branch_reason)
taskification_resultsDecomposed sub-prompts
execution_run_recordsLLM execution facts (original or replay)
response_branch_recordsComparative records for the same prompt (branching origin tracked via context_anchor_turn_id)
lineage_watermarkWatermark for incremental processing

Design Problems Found in Code Review

After implementing Phase 1 (conversation lineage foundation), code review revealed four critical problems.

Problem 1: Grouping Breakdown in response_branch_records

A new turn_id was assigned per replay, but branch grouping was done per turn_id, making each replay a separate group and leaving branch_index effectively at 0. This was inconsistent with the spec’s “shared parent / controlled branches” comparison model, making side-by-side comparison and preference pair construction impossible.

Fix: Switch grouping to be based on context_anchor_turn_id.

Problem 2: Missing Metadata in original turn execution_run_records

Only role / content / created_at were fetched from chat_history, but the builder assumed model / backend / assistant_message / token_usage / duration_ms would be present. As a result, response_text and usage for original runs were empty.

The actual metadata existed in llm_chat_turns (DuckDB side) or pipeline_event_inbox (PostgreSQL side), requiring a fix to fetch from the correct source.

Problem 3: turn_index Collision in Replay Branches

Replay branches had turn_index = parent_turn_index + 1 hardcoded, colliding with the existing next turn. Since reads used only ORDER BY turn_index ASC, ordering between sibling branches and existing serial turns became unstable, breaking the premise that “any node can be a replay root.”

Fix: Abandon the turn_index-based serial model and switch to a parent-child tree structure via the branch node table.

Problem 4: Unimplemented Core Assets

Materialization for conversation_sessions and conversation_branch_nodes was missing, meaning depth / branch_kind / context_snapshot_id / variant_config / node_status from the spec were never stored, leaving the system insufficient as a lineage-aware branching graph.

Remaining Issues After Initial Fix

Two points remained unresolved after the first round of fixes:

  • Whether to exclude replay nodes during normal materialization, or to skip turns with no original source in build_execution_record_from_event()
  • conversation_branch_nodes requires 2-pass construction for parent references

Final fix approach:

  • Include topology columns (parent_node_id, depth, branch_kind) in upsert_branch_node() conflict updates
  • Populate status / error / tool_trace from pipeline_event for original runs as well

The Core Design Principle

In one sentence, this system is a foundation for treating LLM conversations as replayable branching graphs.

The existing session / session_turn / session_lineage serve as the execution log substrate, with five experimental asset tables built on top. The goal is to perform controlled replay using any node as a context anchor, collect and evaluate response distributions, and ultimately connect to training datasets.

taskification_results is distinct from existing flow lineage and stands alone as an intermediate representation for decomposing prompts into subtask orchestration plans.

The three-tier storage separation (PostgreSQL / DuckDB / Iceberg) keeps each layer’s design decisions independent.


Results

  • Designed three asset groups—Conversation Lineage, Evaluation & Distribution Labeling, Training Dataset Generation—adding approximately 30 assets in total
  • Finalized a three-tier storage strategy: PostgreSQL / DuckDB / Iceberg
  • Completed Phase 1 (conversation lineage foundation) implementation
  • Discovered and fixed four critical design problems through code review
  • Clarified remaining issues: branch node 2-pass construction, context_anchor-based grouping, replay node exclusion logic
  • Established implementation priority as five stages: replay foundation → taskification → trajectory normalization → evaluation → dataset generation

Subsequent Pivot

During implementation and operation of this design, the direction shifted in several ways.

Adding Telemetry and Pivoting to a Visualization Pipeline

One shortcoming of expressing conversation lineage through Dagster’s asset graph was that it was hard for humans to grasp the real-time state. I added a telemetry layer and settled on routing metrics through Vector → Prometheus → Grafana. Drawing conversation state, branching, and evaluation results as Grafana dashboards made it possible to observe LLM-to-LLM interactions in real time in a human-readable form, without them becoming a black box.

Flattening Dagster Assets

With visualization moved to the telemetry side, there was no longer a strong reason to maintain complex Dagster asset structures. Rather than expressing the conversation tree as a deep dependency graph, I pivoted to flattening the assets.

Redesigning agent-gateway as an Orchestrator Model

A larger change was overhauling the agent-gateway architecture itself. The structure shifted from a single gateway processing all requests to a system where an orchestrator model drives a set of subordinate worker LLM models. With LLMs interacting with each other to process tasks, materializing those interaction flows as assets and visualizing them in Grafana in real time became important.

The details of this redesign are planned for a separate write-up.