Dagster 会話リネージュ・評価・データセット生成システムの設計と実装
agent-gateway の既存 Dagster パイプライン上に、会話を再実行可能な分岐グラフとして扱う3アセットグループ(Conversation Lineage・Evaluation・Training Dataset Generation)を設計・実装した記録。ストレージ三層構造の採用、コードレビューで発見した4つの設計問題とその修正まで記録する。
結論
agent-gateway では Go + NATS + Dagster によるイベントパイプラインの実装がすでに稼働していた。この基盤の上に、LLM 会話を 再実行可能な分岐グラフ として扱うための3つのアセットグループを設計・実装した。
設計の核心は責務の分離にある。PostgreSQL を replay 必須のリネージュデータの truth source とし、DuckDB を評価・クラスタリング向けの分析ワークベンチとし、Iceberg / Parquet を凍結済みデータセットスナップショットとする三層構造をとることで、各レイヤーの設計判断が独立する。
実装フェーズでは、コードレビューにより4つの重大な設計問題(グルーピング破綻・メタデータ欠落・turn_index 衝突・中核 asset 未実装)が発見され、それぞれ修正が必要だった。
前提
既存パイプラインの状態
agent-gateway は Go + Gin で構築した OpenAI 互換プロキシゲートウェイに、NATS JetStream によるイベントストリーミング、Dagster によるパイプライン処理、PostgreSQL + pgvector によるベクトル検索、DuckDB による分析ワークベンチを統合した LLM 基盤である。
既存の pipeline_runtime アセットグループでは 11 個の Dagster アセットが稼働していた。NATS センサーがイベントを拾い、pipeline_event_inbox から llm_chat_turns、flow_nodes、flow_edges などの分析テーブルへ書き込む処理フローが確立していた。
不足していた機能
既存のパイプラインが扱うのは個々のリクエストイベントの正規化と蓄積までだった。以下は存在していなかった。
- 会話全体を 分岐可能なグラフ として扱う仕組み
- トラジェクトリの評価
- トレーニング用データセットの自動生成
ローカル OSS LLM を活用した fine-tuning・モデル評価・system prompt の最適化など、LLM データスタックを構築するためにこれらを Dagster 上に追加で設計・実装する必要があった。
3つのアセットグループの設計
Conversation Lineage Assets
会話ツリーそのものを表現するアセットグループ。
| アセット | 役割 |
|---|---|
seed_themes | 実験テーマの参照テーブル |
conversation_sessions | セッションの管理 |
conversation_branch_nodes | 会話ツリーの全ノードを保持 |
taskification_results | MCP やタスク化の中間結果 |
execution_run_records | 実行ファクト |
response_branch_records | 最終レスポンスとトレース |
各ノードは branch_kind(root・retry・tool_variant・sampling_variant・prompt_variant・taskify_variant)と variant_config を JSON で保持する。
依存グラフは直列構造になっている。
seed_themes
→ conversation_sessions
→ conversation_branch_nodes
→ taskification_results
→ execution_run_records
→ response_branch_records
リプレイルール
taskify_rule_versionが変わった場合はtaskification_results以降を再生成model_snapshot_idが変わった場合はexecution_run_records以降を再生成- 特定ノードの条件変更はそのサブツリーのみ再生成
Evaluation and Distribution Labeling Assets
会話トラジェクトリを評価可能なユニットに変換するアセットグループ。
trajectory_units # ルートからターミナルノードまでのパスを正規化
→ trajectory_embeddings # ベクトル化
→ trajectory_clusters # クラスタリング
→ cluster_category_labels # カテゴリラベル付与
→ human_sample_queue # 人間レビューキュー
→ human_eval_labels # 評価結果の蓄積
→ candidate_preference_pairs
→ preference_labels # DPO ペア構築
カテゴリラベルの例として tool_helped、tool_misuse、hallucination、good_recovery、overlong、underspecified、stable_high_quality などを定義した。
設計原則
- embedding はラベルではなく表現
- クラスタは判断ではなく構造
- モデルによるカテゴリ化と人間の選好は別レイヤーとして明確に分離する
Training Dataset Generation Assets
評価済みのトラジェクトリからトレーニングデータセットを構築するアセットグループ。
positive_topk_trajectories # 層別 top-K 抽出
negative_bottomk_trajectories # 層別 bottom-K 抽出
→ sft_messages_dataset # SFT 用データセット
→ dpo_preference_dataset # DPO 用データセット
→ suppression_dataset # 抑制用データセット
→ unlearning_dataset # アンラーニング用データセット
→ dataset_snapshot_registry # 凍結済みスナップショットのメタデータ管理
negative type として hard_negative・tool_misuse・hallucination・noise_reject・suppression_candidate を定義し、すべてを一律にアンラーニングに使うのではなくタイプ別に分離して扱う。
設計上の主要な意思決定
能動的オーケストレーターか受動的キャプチャか
仕様の seed_themes → conversation_sessions → branch_nodes が能動的に LLM 会話を駆動するものか、既存の会話を受動的に整理するものかという疑問が最初に生じた。
整理した結論として、まず手動でテーマを決めてセッション ID を発行し、LLM との対話を改善・失敗に至るまで繰り返す。次にセッション ID をもとにマテリアライズして conversation asset を構築する。「任意ノードからのリプレイ」はまず replay substrate を意味し、その上に将来的な積極的オーケストレーターを乗せる設計だという整理に落ち着いた。
分岐処理の位置
既存の session / session_turn / session_lineage テーブルを捨てるのではなく、実行ログの土台(substrate) として利用する。その上に実験用セマンティックレイヤーとして asset を構築する方針とした。
既存の親子関係のあるターン構造に対し、以下の情報を追加で保持する。
branch_kindvariant_confignode_statuscontext_snapshot_idtaskification replay point
taskification_results は既存の FlowLineageEvent とは別物で、prompt decomposition と orchestration plan の中間表現として独立させる。
ストレージ三層構造
| レイヤー | ストレージ | 用途 |
|---|---|---|
| Truth source | PostgreSQL | replay 必須のリネージュデータ |
| 分析ワークベンチ | DuckDB | 評価・クラスタリング向け派生データ |
| Immutable artifact | Iceberg / Parquet | 凍結済みデータセットスナップショット |
この分離により、リネージュの保存方式を変えても評価パイプラインには影響せず、データセットのフォーマットを変えてもリネージュに波及しない。
実装優先順位
グループ単位ではなく、実際の着手順として5段階に分解した。
- replay 可能なリネージュモデルの確立
- taskification と branch generation の導入
- trajectory 化
- embedding / clustering / labeling
- dataset generation と snapshot freeze
最優先は OSS ローカル LLM を使った replay と branch node の正規化。ここが固まらないと downstream の評価 asset も dataset asset も不安定になる。
実装計画
ディレクトリ構造
新規コードは既存の model-foundry/src/loftllc/ パッケージ配下に、pipeline_runtime と並列のモジュールとして追加する。既存の domain / infra / presentation レイヤリングに従い、以下のような構造をとる。
loftllc/
domain/
conversation_lineage.py
infra/
lineage_store.py
presentation/
dagster/
assets/
conversation_lineage.py
アセット実行パターンの違い
既存の pipeline_runtime アセットは 単一イベント駆動(NATS メッセージごとに 1 RunRequest)だが、リネージュアセットは根本的に異なる。
- NATS イベントはすでに
pipeline_runtimeで処理されllm_chat_turnsやflow_nodesに蓄積済み - リネージュアセットはそこから読み取る 下流コンシューマ として動作
- セッションの蓄積データをバッチ的にマテリアライズする方式
マテリアライゼーション戦略:
| グループ | トリガー |
|---|---|
| Group 1(Conversation Lineage) | 手動または 60 秒間隔センサー |
| Group 2(Evaluation) | Group 1 への依存による自動トリガー |
| Group 3(Dataset Generation) | 完全手動トリガー |
パーティション戦略
DynamicPartitionsDefinition を使用する。セッション ID は Go ゲートウェイのランタイムで生成されるため事前宣言できない。センサーが llm_chat_turns から新しいセッション ID を発見し、動的パーティションセットに追加してから RunRequest を発行する形をとる。Group 3 はスナップショット ID による別の動的パーティションを使用する。
リプレイメカニズム
「ノード X からのリプレイ」は以下の処理を意味する。
- ノード X の会話状態(凍結済みメッセージ履歴)を取得
- 1 つ以上の LLM にそのメッセージ履歴を送信
- 新しいレスポンスを
response_branch_recordsとして記録
LLMClientResource という新しい infra リソースを追加し、ゲートウェイの /v1/chat/completions エンドポイントへ HTTP リクエストを送る。これにより replay もゲートウェイ経由になり、同じルーティング・ロギング・NATS イベントが記録される。
PostgreSQL スキーマ設計
リネージュ用テーブルの定義。
| テーブル | 役割 |
|---|---|
seed_themes | 実験テーマの参照テーブル |
conversation_sessions | llm_chat_turns から集約されたセッションビュー |
conversation_branch_nodes | 会話内の分岐点(branch_node_id・parent_node_id・turn_index・message_history_json・branch_reason) |
taskification_results | 分解されたサブプロンプト |
execution_run_records | LLM 実行ファクト(original または replay) |
response_branch_records | 同一プロンプトに対する比較レコード(context_anchor_turn_id で分岐元を追跡) |
lineage_watermark | 増分処理用のウォーターマーク |
実装レビューで発見した設計問題
Phase 1(会話リネージュ基盤)の実装後にコードレビューを行い、4つの重大な問題が発見された。
問題1: response_branch_records のグルーピング破綻
replay ごとに新しい turn_id を採番しているのに、branch grouping が turn_id 単位で行われていたため、各 replay が別グループになり branch_index が実質 0 のまま残る問題。仕様の「shared parent / controlled branches による比較」と一致せず、side-by-side 比較や preference pair の元データにならない。
修正方針: context_anchor_turn_id をベースにしたグルーピングへ変更する。
問題2: original turn の execution_run_records メタデータ欠落
chat_history テーブルから role / content / created_at しか取得していないのに、ビルダーは model / backend / assistant_message / token_usage / duration_ms を読む前提だった。結果として original run の response_text や usage が空になる。
実際のメタデータは llm_chat_turns(DuckDB 側)または pipeline_event_inbox(PostgreSQL 側)に存在しており、正しいソースから取得するよう修正が必要だった。
問題3: replay branch の turn_index 衝突
replay branch に turn_index = parent_turn_index + 1 を固定で入れているため、既存の次ターンと衝突する問題。読み出しは ORDER BY turn_index ASC のみなので、兄弟 branch と既存直列ターンの順序が不安定になり「任意ノードを replay root にできる」前提が壊れる。
修正方針: turn_index ベースの直列モデルを廃止し、branch node table による親子ツリー構造に切り替える。
問題4: 仕様の中核 asset の未実装
conversation_sessions と conversation_branch_nodes のマテリアライゼーションが欠けており、仕様にある depth / branch_kind / context_snapshot_id / variant_config / node_status が保存されないため、lineage-aware branching graph として不十分だった。
修正後の残課題
1 回目の修正でも完全には解消せず、以下の2点が残った。
- 通常マテリアライズで replay node を除外するか、
build_execution_record_from_event()側で original source のない turn を skip するかの選択 conversation_branch_nodesの親参照 2-pass 構築が必要
最終的な修正方針:
upsert_branch_node()の conflict update に topology 系カラム(parent_node_id・depth・branch_kind)も含める- original run でも
status / error / tool_traceをpipeline_eventから埋める
設計の本質
この仕組みの位置づけを一言で表すと、LLM 会話を再実行可能な分岐グラフとして扱う基盤 である。
既存の session / session_turn / session_lineage を実行ログの土台として使い、その上に実験用 asset として5つのテーブルを構築する。目的は、任意ノードを context anchor として controlled replay を行い、応答分布を収集・評価し、最終的に学習用 dataset へ接続することだ。
taskification_results は既存 flow lineage とは別物であり、prompt を subtask 群へ分解する orchestration plan の中間表現として独立する。
ストレージの三層分離(PostgreSQL / DuckDB / Iceberg)により、各レイヤーの設計判断が独立する。
結果
- Conversation Lineage・Evaluation & Distribution Labeling・Training Dataset Generation の3アセットグループを設計し、合計約30個のアセットを追加
- PostgreSQL / DuckDB / Iceberg の三層ストレージ戦略を確定
- Phase 1(会話リネージュ基盤)の実装を完了
- コードレビューで4つの重大な設計問題を発見・修正
- 残課題(branch node の 2-pass 構築・context_anchor ベースのグルーピング・replay node の除外ロジック)を明確化
- 実装優先順位を「replay 基盤 → taskification → trajectory 化 → evaluation → dataset generation」の5段階に確定
その後の方針転換
この設計を実装・運用していく中で、方向性が変わった部分がある。
テレメトリの追加と可視化パイプラインへの転換
Dagster のアセットグラフで会話リネージュを表現するアプローチの課題として、人間がリアルタイムで状況を把握しにくい点があった。そこでテレメトリレイヤーを追加し、Vector → Prometheus → Grafana の経路でメトリクスを流す構成に落ち着いた。会話の状態・分岐・評価結果を Grafana 上のダッシュボードとして描画することで、LLM 同士の対話をブラックボックスにせず、人間が理解できる形でリアルタイムに観測できるようにした。
Dagster アセットのフラット化
可視化をテレメトリ側に移したことで、Dagster のアセット構造を複雑に保つ必然性が薄れた。深い依存グラフで会話ツリーを表現するのではなく、アセットをフラット化する方針に転換した。
agent-gateway のオーケストレーターモデルへの再設計
より大きな変化として、agent-gateway 自体のアーキテクチャを刷新した。単一のゲートウェイが全リクエストを処理する構造から、オーケストレーターモデルが従属するワーカー LLM 群を操作する仕組みへ実装し直した。LLM 同士が対話しながらタスクを処理するようになったため、その対話フローをアセット化し、Grafana でリアルタイムに可視化することが重要になった。
この再設計の詳細は別の機会に記録する予定。
