Guidesデータ移行スクリプト
This is a Jupyter notebook

Langfuseプロジェクト間のデータ移行 (Python SDK v4)

このノートブックでは、Langfuse Python SDK v4 を使って、あるLangfuseプロジェクトから別のLangfuseプロジェクトへデータを移行します。

よくあるユースケース:

  • クラウドリージョン間の移行 (US ↔ EU ↔ JP ↔ HIPAA)
  • セルフホスト環境から Langfuse Cloud への移行

移行対象

  1. スコア設定 (Score Configs)
  2. カスタムモデル定義
  3. プロンプト (全バージョン)
  4. オブザベーション (トレース + ネストされたオブザベーション。OTLPエンドポイント経由で送るため、元の start_time / end_time がそのまま保持されます)
  5. スコア
  6. データセット (アイテム + 実行アイテム。移行後のトレース/オブザベーションに紐付けられます)

移行されないもの

Langfuse の公開APIでは現在、次のリソースをプログラムから作成することはできません: LLM-as-a-Judge の評価設定カスタムダッシュボードユーザー / RBAC / SSOプロジェクトや組織の設定。これらは手動で作り直すか、UI上のエクスポート機能を使って移してください。

0. セットアップ

%pip install --quiet "langfuse>=4.0.0" "opentelemetry-sdk>=1.25.0" "opentelemetry-exporter-otlp-proto-http>=1.25.0"
import os

# --- 移行元 (SOURCE) プロジェクトの認証情報 ---
os.environ["LANGFUSE_SOURCE_PUBLIC_KEY"] = "pk-lf-.."
os.environ["LANGFUSE_SOURCE_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_SOURCE_HOST"] = "https://cloud.langfuse.com"  # 例: US クラウド

# --- 移行先 (DESTINATION) プロジェクトの認証情報 ---
os.environ["LANGFUSE_DEST_PUBLIC_KEY"] = "pk-lf-.."
os.environ["LANGFUSE_DEST_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_DEST_HOST"] = "https://jp.cloud.langfuse.com"  # 例: JP クラウド

# --- オブザベーション移行の時間フィルタ (任意, ISO 8601) ---
# 空にすると全トレースが対象になります。
os.environ["LANGFUSE_MIGRATE_FROM_TIMESTAMP"] = "2026-03-01T00:00:00Z"  # 例: "2025-01-01T00:00:00Z"
os.environ["LANGFUSE_MIGRATE_TO_TIMESTAMP"] = "2026-04-20T00:00:00Z"    # 例: "2025-02-01T00:00:00Z"
import base64
import datetime as dt
import json
import re
import time
from hashlib import sha256
from typing import Any, Dict, List, Optional

from langfuse import Langfuse

SOURCE_CFG = {
    "public_key": os.environ["LANGFUSE_SOURCE_PUBLIC_KEY"],
    "secret_key": os.environ["LANGFUSE_SOURCE_SECRET_KEY"],
    "base_url": os.environ.get("LANGFUSE_SOURCE_HOST", "https://cloud.langfuse.com").rstrip("/"),
}
DEST_CFG = {
    "public_key": os.environ["LANGFUSE_DEST_PUBLIC_KEY"],
    "secret_key": os.environ["LANGFUSE_DEST_SECRET_KEY"],
    "base_url": os.environ.get("LANGFUSE_DEST_HOST", "https://cloud.langfuse.com").rstrip("/"),
}

# 整合性チェック: 移行元と移行先では必ず別々の public_key を使ってください。Langfuse v4 の
# リソースマネージャは public_key をキーにしたシングルトンなので、同じキーを使い回すと
# 2つ目のクライアントが1つ目の base_url / secret_key を暗黙的に共有してしまいます。
assert SOURCE_CFG["public_key"] != DEST_CFG["public_key"], (
    "移行元と移行先では別々の public_key を指定してください。Langfuse v4 の "
    "リソースマネージャは public_key をキーにしたシングルトンなので、両プロジェクトで "
    "同じキーを使うと、2つ目のクライアントが1つ目のホストと secret_key を "
    "使い回してしまいます。"
)

# SDK のコンストラクタが拾ってしまう可能性のある Langfuse 関連の環境変数を念のため
# クリアしておきます。特に `LANGFUSE_BASE_URL` は `host=` 引数より優先されますし、
# `LANGFUSE_PUBLIC_KEY` / `LANGFUSE_SECRET_KEY` もフォールバックとして使われます。
# ここでは上で明示的に指定したプロジェクトごとの値を常に優先させたいので、全部消します。
for _leak in (
    "LANGFUSE_BASE_URL",
    "LANGFUSE_HOST",
    "LANGFUSE_PUBLIC_KEY",
    "LANGFUSE_SECRET_KEY",
    "LANGFUSE_TRACING_ENVIRONMENT",
    "LANGFUSE_RELEASE",
):
    os.environ.pop(_leak, None)

# 移行元・移行先用に独立した Langfuse クライアントを2つ作ります。`tracing_enabled=False`
# を指定して、各クライアントのバックグラウンド OTel エクスポーターを無効化し、
# REST + ヘルパーのラッパーとしてだけ使います。オブザベーションのセクションでは、
# 専用の OTLP パイプラインを別に組みます。
# `host=` ではなく `base_url=` を使っているのは、`base_url` が Langfuse のコンストラクタ
# 内で最優先され、環境変数で上書きされないからです。
# `timeout=60` は SDK のデフォルト (5秒) を上書きするためです。ネストされたオブザベーション
# がたくさんある大きなトレースだと、`trace.get()` に5秒以上かかることがよくあります。
src = Langfuse(**SOURCE_CFG, tracing_enabled=False, timeout=60)
dst = Langfuse(**DEST_CFG, tracing_enabled=False, timeout=60)

assert src.auth_check(), "移行元の認証情報が無効です"
assert dst.auth_check(), "移行先の認証情報が無効です"
print(f"移行元:   {SOURCE_CFG['base_url']}")
print(f"移行先:   {DEST_CFG['base_url']}")
# 各セクションで共通で使うユーティリティ関数です。

_HEX32 = re.compile(r"^[0-9a-f]{32}$")
_HEX16 = re.compile(r"^[0-9a-f]{16}$")


def dumps(obj: Any) -> str:
    """UTF-8 セーフな JSON シリアライズ (日本語など非 ASCII を読める形で保持)。"""
    return json.dumps(obj, ensure_ascii=False, default=str)


def to_otel_trace_id(source_trace_id: str) -> str:
    """移行元の ID に対して、有効な 32 文字の hex (Langfuse/OTel trace ID) を返します。

    - 元の ID がすでに 32 文字の小文字 hex ならそのまま返すので、移行先でも
      元の ID が保持されます。
    - それ以外の場合は、元の ID から決定論的に (sha256) 派生させるので、
      再実行しても常に同じ移行先 ID になります。
    """
    sid = source_trace_id.lower()
    if _HEX32.match(sid):
        return sid
    return sha256(sid.encode("utf-8")).hexdigest()[:32]


def to_otel_span_id(source_obs_id: str) -> str:
    """移行元のオブザベーション ID に対して、有効な 16 文字の hex (OTel span ID) を返します。"""
    sid = source_obs_id.lower()
    if _HEX16.match(sid):
        return sid
    return sha256(sid.encode("utf-8")).hexdigest()[:16]


import httpx

_RETRIABLE_CODES = {429, 500, 502, 503, 504}
# リトライ対象にしたい一時的なネットワークエラー (status_code 属性を持たないもの)。
_RETRIABLE_EXCEPTIONS = (
    httpx.TimeoutException,          # ReadTimeout / ConnectTimeout / WriteTimeout / PoolTimeout を含む
    httpx.RemoteProtocolError,       # サーバーが応答の途中で接続を切った場合
    httpx.ConnectError,              # DNS / TCP レベルの失敗
    httpx.NetworkError,              # ソケットレベルのエラー全般
    TimeoutError,                    # 標準ライブラリの asyncio / socket タイムアウト
    ConnectionError,                 # 標準ライブラリの接続リセット
)


def with_retries(fn, *, max_retries: int = 5, base_sleep: float = 0.5):
    """レート制限 / 5xx / 一時的なネットワークエラーに対して指数バックオフでリトライします。

    リトライ対象は (a) Fern が生成した HTTP エラーのうち `status_code` が
    {429, 500, 502, 503, 504} に含まれるもの、または (b) `_RETRIABLE_EXCEPTIONS`
    に列挙した一時的なネットワーク例外クラスです。それ以外はそのまま送出します。
    """
    for attempt in range(max_retries):
        try:
            return fn()
        except Exception as exc:
            status = getattr(exc, "status_code", None)
            retriable = status in _RETRIABLE_CODES or isinstance(exc, _RETRIABLE_EXCEPTIONS)
            if not retriable or attempt == max_retries - 1:
                raise
            sleep_for = base_sleep * (2 ** attempt)
            label = status if status is not None else type(exc).__name__
            print(f"    リトライします ({label}) — {sleep_for:.1f}秒待機")
            time.sleep(sleep_for)


def parse_ts(s: Optional[str]) -> Optional[dt.datetime]:
    if not s:
        return None
    if s.endswith("Z"):
        s = s[:-1] + "+00:00"
    d = dt.datetime.fromisoformat(s)
    return d.replace(tzinfo=dt.timezone.utc) if d.tzinfo is None else d


FROM_TS = parse_ts(os.environ.get("LANGFUSE_MIGRATE_FROM_TIMESTAMP") or None)
TO_TS = parse_ts(os.environ.get("LANGFUSE_MIGRATE_TO_TIMESTAMP") or None)
print(f"時間フィルタ: from={FROM_TS}  to={TO_TS}")

1. スコア設定 (Score Configs)

スコア設定は最初に移行しておく必要があります。こうすると、移行先で作成するスコアやアノテーションキューが ID で参照できるようになります。あとのセクションで参照をつなぎ直すために config_id_map を作っておきます。

from langfuse.api import ConfigCategory

config_id_map: Dict[str, str] = {}
existing_by_name = {c.name: c for c in dst.api.score_configs.get(limit=100).data}

page = 1
migrated = skipped = failed = 0
while True:
    resp = with_retries(lambda: src.api.score_configs.get(page=page, limit=100))
    if not resp.data:
        break
    for cfg in resp.data:
        if cfg.name in existing_by_name:
            config_id_map[cfg.id] = existing_by_name[cfg.name].id
            skipped += 1
            continue
        try:
            # Langfuse API は CATEGORICAL 設定のときだけ `categories` を受け付け
            # (BOOLEAN のカテゴリはサーバー側で自動生成されます)、NUMERIC 設定の
            # ときだけ `min_value` / `max_value` を受け付けます。サーバーが弾く
            # フィールドを送らないよう、kwargs を条件分岐で組み立てます。
            create_kwargs: Dict[str, Any] = {
                "name": cfg.name,
                "data_type": cfg.data_type,
                "description": cfg.description,
            }
            if cfg.data_type == "CATEGORICAL" and cfg.categories:
                create_kwargs["categories"] = [
                    ConfigCategory(label=c.label, value=c.value) for c in cfg.categories
                ]
            if cfg.data_type == "NUMERIC":
                if cfg.min_value is not None:
                    create_kwargs["min_value"] = cfg.min_value
                if cfg.max_value is not None:
                    create_kwargs["max_value"] = cfg.max_value
            created = with_retries(lambda: dst.api.score_configs.create(**create_kwargs))
            config_id_map[cfg.id] = created.id
            migrated += 1
        except Exception as exc:
            print(f"  スコア設定 '{cfg.name}' の移行に失敗: {exc}")
            failed += 1
    if page >= getattr(resp.meta, "total_pages", page):
        break
    page += 1

print(f"スコア設定 — 移行: {migrated}, スキップ (既存): {skipped}, 失敗: {failed}")

2. カスタムモデル定義

ユーザー定義のモデルだけを移行します。Langfuse 側で管理されているモデルはデフォルトで全プロジェクトに存在するので、ここではスキップします。

page = 1
migrated = skipped = failed = 0
while True:
    resp = with_retries(lambda: src.api.models.list(page=page, limit=100))
    if not resp.data:
        break
    for m in resp.data:
        if getattr(m, "is_langfuse_managed", False):
            skipped += 1
            continue
        try:
            # サーバーはフラットな価格と pricing_tiers が混在したリクエストを弾きます。
            # 移行元が pricing_tiers を使っているモデル (例: gpt-image-1 のような
            # トークン段階制のモデル) の場合はそれを転送し、フラットな価格は送らないように
            # します。それ以外の場合はフラットな価格だけを送ります。
            create_kwargs: Dict[str, Any] = {
                "model_name": m.model_name,
                "match_pattern": m.match_pattern,
                "start_date": m.start_date,
                "unit": m.unit,
                "tokenizer_id": m.tokenizer_id,
                "tokenizer_config": m.tokenizer_config,
            }
            if getattr(m, "pricing_tiers", None):
                create_kwargs["pricing_tiers"] = m.pricing_tiers
            else:
                if m.input_price is not None:
                    create_kwargs["input_price"] = m.input_price
                if m.output_price is not None:
                    create_kwargs["output_price"] = m.output_price
                if m.total_price is not None:
                    create_kwargs["total_price"] = m.total_price
            with_retries(lambda: dst.api.models.create(**create_kwargs))
            migrated += 1
        except Exception as exc:
            if getattr(exc, "status_code", None) == 409 or "already exists" in str(exc).lower():
                skipped += 1
            else:
                print(f"  モデル '{m.model_name}' の移行に失敗: {exc}")
                failed += 1
    if page >= getattr(resp.meta, "total_pages", page):
        break
    page += 1

print(f"カスタムモデル — 移行: {migrated}, スキップ: {skipped}, 失敗: {failed}")

3. プロンプト

すべてのプロンプトについて、全バージョンを移行します。バージョン番号は移行先で新しく振り直されますが、元のバージョン番号は commit_message に残しておくので後から追跡できます。タグとラベルはそのまま引き継がれます。

from urllib.parse import quote

from langfuse.api import (
    CreateChatPromptRequest,
    CreateChatPromptType,
    CreateTextPromptRequest,
    CreateTextPromptType,
    Prompt_Chat,
    Prompt_Text,
)

page = 1
prompt_versions = []  # (name, version) のタプル
while True:
    resp = with_retries(lambda: src.api.prompts.list(page=page, limit=100))
    if not resp.data:
        break
    for meta in resp.data:
        for v in meta.versions:
            prompt_versions.append((meta.name, v))
    if page >= getattr(resp.meta, "total_pages", page):
        break
    page += 1

prompt_versions.sort(key=lambda x: (x[0], x[1]))
print(f"移行元に {len(prompt_versions)} 個のプロンプトバージョンが見つかりました")

migrated = failed = 0
for name, version in prompt_versions:
    try:
        # フォルダ形式のプロンプト (名前に '/' を含むもの) は、GET エンドポイントで
        # パスセグメントを URL エンコードする必要があります (Fern クライアントは
        # 自動ではやってくれません)。create() のリクエストボディには生の名前をそのまま
        # 渡します。
        encoded_name = quote(name, safe="")
        p = with_retries(lambda: src.api.prompts.get(encoded_name, version=version))
        commit_msg = f"移行元から移行しました。元のバージョン: {version}。"
        if isinstance(p, Prompt_Chat):
            req = CreateChatPromptRequest(
                name=p.name,
                type=CreateChatPromptType.CHAT,
                prompt=p.prompt,
                labels=p.labels or [],
                tags=p.tags or [],
                config=p.config,
                commit_message=commit_msg,
            )
        elif isinstance(p, Prompt_Text):
            req = CreateTextPromptRequest(
                name=p.name,
                type=CreateTextPromptType.TEXT,
                prompt=p.prompt,
                labels=p.labels or [],
                tags=p.tags or [],
                config=p.config,
                commit_message=commit_msg,
            )
        else:
            print(f"  {name} v{version} のプロンプト型が不明です: {type(p).__name__}")
            failed += 1
            continue
        with_retries(lambda: dst.api.prompts.create(request=req))
        migrated += 1
    except Exception as exc:
        print(f"  {name} v{version} の移行に失敗: {exc}")
        failed += 1

print(f"プロンプト — 移行: {migrated}, 失敗: {failed}")

4. オブザベーション (OTLPインジェストエンドポイント経由)

ここがデータ移行のメインパートです。移行元の各トレース (とネストされたオブザベーション) を、移行先の OTLP インジェストエンドポイント /api/public/otel/v1/traces に送ります。オブザベーションの start_time / end_time を取り込み時点で正しくセットできるのは OTLP エンドポイント だけ です。従来のREST経由のインジェストだと、これらのフィールドは上書きされてしまいます。

セルフホスト環境の場合: Observations API v2 は現状 Cloud 専用 です。セルフホストの Langfuse から移行する場合は、下の observations.get_many(...) の呼び出しを src.api.trace.get(trace.id).observations (v1 エンドポイント) に置き換えてください。セルフホスト向けの v2 対応はロードマップに入っています。

from opentelemetry import trace as otel_trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags

# --- 移行先プロジェクトに向けた専用の OTLP パイプラインを組み立てる ---
_auth = base64.b64encode(
    f"{DEST_CFG['public_key']}:{DEST_CFG['secret_key']}".encode("utf-8")
).decode("ascii")

otlp_provider = TracerProvider(resource=Resource.create({"service.name": "langfuse-migration"}))
otlp_provider.add_span_processor(
    SimpleSpanProcessor(
        OTLPSpanExporter(
            endpoint=f"{DEST_CFG['base_url']}/api/public/otel/v1/traces",
            headers={
                "Authorization": f"Basic {_auth}",
                "x-langfuse-ingestion-version": "4",
            },
        )
    )
)
migration_tracer = otlp_provider.get_tracer("langfuse-migration")


def _to_ns(d: Optional[dt.datetime]) -> Optional[int]:
    if d is None:
        return None
    if d.tzinfo is None:
        d = d.replace(tzinfo=dt.timezone.utc)
    return int(d.timestamp() * 1_000_000_000)


def _set_scalar(span, key: str, value: Any) -> None:
    """`span` に OTel 属性をセットします。プリミティブ以外は UTF-8 JSON で文字列化します。"""
    if value is None:
        return
    if isinstance(value, (str, bool, int, float)):
        span.set_attribute(key, value)
        return
    span.set_attribute(key, dumps(value))


def _as_attr_string(value: Any) -> Optional[str]:
    """input / output / metadata の値を JSON 属性文字列に変換します。

    Observations API v2 は input / output / metadata を生の JSON 文字列で返しますが、
    Trace の一覧取得 API はパース済みの dict / list で返してきます。文字列はそのまま
    通して二重エンコードを防ぎ、それ以外は ensure_ascii=False で JSON 化します
    (日本語もそのまま読める形になります)。
    """
    if value is None:
        return None
    if isinstance(value, str):
        return value
    return dumps(value)


def _set_trace_attrs(span, trace_obj) -> None:
    """ルートスパンに trace レベルの langfuse.* 属性をセットします。"""
    _set_scalar(span, "langfuse.trace.name", trace_obj.name)
    _set_scalar(span, "langfuse.user.id", trace_obj.user_id)
    _set_scalar(span, "langfuse.session.id", trace_obj.session_id)
    _set_scalar(span, "langfuse.release", trace_obj.release)
    _set_scalar(span, "langfuse.version", trace_obj.version)
    _set_scalar(span, "langfuse.environment", trace_obj.environment or "default")
    if trace_obj.public is not None:
        span.set_attribute("langfuse.trace.public", bool(trace_obj.public))
    if trace_obj.tags:
        # Langfuse は string[] 属性としても JSON 文字列としても受け付けます。
        span.set_attribute("langfuse.trace.tags", list(trace_obj.tags))
    input_str = _as_attr_string(trace_obj.input)
    if input_str is not None:
        span.set_attribute("langfuse.trace.input", input_str)
    output_str = _as_attr_string(trace_obj.output)
    if output_str is not None:
        span.set_attribute("langfuse.trace.output", output_str)
    md = trace_obj.metadata
    if isinstance(md, dict):
        for k, v in md.items():
            _set_scalar(span, f"langfuse.trace.metadata.{k}", v)
    elif md is not None:
        _set_scalar(span, "langfuse.trace.metadata._raw", md)


_OBS_TYPE_MAP = {
    "SPAN": "span",
    "GENERATION": "generation",
    "EVENT": "event",
    "AGENT": "span",
    "TOOL": "span",
    "CHAIN": "span",
    "RETRIEVER": "span",
    "EVALUATOR": "span",
    "EMBEDDING": "generation",
    "GUARDRAIL": "span",
}


def _set_observation_attrs(span, obs) -> None:
    obs_type = _OBS_TYPE_MAP.get(obs.type, "span")
    span.set_attribute("langfuse.observation.type", obs_type)
    _set_scalar(span, "langfuse.observation.level", obs.level)
    _set_scalar(span, "langfuse.observation.status_message", obs.status_message)
    _set_scalar(span, "langfuse.version", obs.version)
    _set_scalar(span, "langfuse.environment", obs.environment or "default")
    input_str = _as_attr_string(obs.input)
    if input_str is not None:
        span.set_attribute("langfuse.observation.input", input_str)
    output_str = _as_attr_string(obs.output)
    if output_str is not None:
        span.set_attribute("langfuse.observation.output", output_str)
    md = obs.metadata
    if isinstance(md, dict):
        for k, v in md.items():
            _set_scalar(span, f"langfuse.observation.metadata.{k}", v)
    elif md is not None:
        _set_scalar(span, "langfuse.observation.metadata._raw", md)
    # Generation でのみ使う属性。ObservationV2 では `model` が `provided_model_name` に
    # 改名されているので、セルフホスト用の v1 型のときは `model` にフォールバックします。
    if obs_type == "generation":
        model_name = getattr(obs, "provided_model_name", None) or getattr(obs, "model", None)
        _set_scalar(span, "langfuse.observation.model.name", model_name)
        if obs.model_parameters:
            span.set_attribute("langfuse.observation.model.parameters", dumps(obs.model_parameters))
        if obs.usage_details:
            span.set_attribute("langfuse.observation.usage_details", dumps(obs.usage_details))
        if obs.cost_details:
            span.set_attribute("langfuse.observation.cost_details", dumps(obs.cost_details))
        if obs.completion_start_time is not None:
            d = obs.completion_start_time
            if d.tzinfo is None:
                d = d.replace(tzinfo=dt.timezone.utc)
            span.set_attribute(
                "langfuse.observation.completion_start_time",
                d.isoformat().replace("+00:00", "Z"),
            )
        if getattr(obs, "prompt_name", None):
            _set_scalar(span, "langfuse.observation.prompt.name", obs.prompt_name)
        if getattr(obs, "prompt_version", None) is not None:
            span.set_attribute("langfuse.observation.prompt.version", int(obs.prompt_version))


def _parent_ctx(trace_id_hex: str, parent_span_id_hex: Optional[str]):
    if not parent_span_id_hex:
        return None
    parent_sc = SpanContext(
        trace_id=int(trace_id_hex, 16),
        span_id=int(parent_span_id_hex, 16),
        is_remote=True,
        trace_flags=TraceFlags(0x01),
    )
    return otel_trace.set_span_in_context(NonRecordingSpan(parent_sc))


trace_id_map: Dict[str, str] = {}
obs_id_map: Dict[str, str] = {}

# Observations API v2 から取得したいフィールドグループです。`core` は常に含まれます。
# `basic` で name / level / status / version / environment が、`time` で
# completionStartTime が、`io` で input / output が、`metadata` で metadata が、
# `model` で providedModelName / modelParameters が、`usage` で
# usageDetails / costDetails が、`prompt` で promptName / promptVersion が追加されます。
OBSERVATIONS_FIELDS = "core,basic,time,io,metadata,model,usage,prompt"


def _fetch_observations_v2(source_trace_id: str) -> List[Any]:
    """指定したトレースの全オブザベーションを v2 API からカーソルページングで取得します。

    Cloud 専用です。セルフホストから移行する場合は、このヘルパーを次の実装に置き換えてください:

        return with_retries(lambda: src.api.trace.get(source_trace_id)).observations
    """
    results: List[Any] = []
    cursor: Optional[str] = None
    while True:
        resp = with_retries(lambda: src.api.observations.get_many(
            trace_id=source_trace_id,
            fields=OBSERVATIONS_FIELDS,
            limit=1000,
            cursor=cursor,
        ))
        if resp.data:
            results.extend(resp.data)
        cursor = getattr(resp.meta, "cursor", None)
        if not cursor:
            break
    return results


def migrate_one_trace(trace_obj) -> int:
    """1つのトレースとそのオブザベーションを移行します。戻り値は子オブザベーションの件数です。"""
    observations = _fetch_observations_v2(trace_obj.id)
    new_trace_hex = to_otel_trace_id(trace_obj.id)
    trace_id_map[trace_obj.id] = new_trace_hex

    # オブザベーションをソート: 親が子より先、開始時刻の早い順。
    observations.sort(key=lambda o: (o.start_time, o.id))

    # トレースレベルの属性を載せるルートスパン。移行元に明示的なルートオブザベーションが
    # ない場合は、トレース自身のタイムスタンプで合成します。
    root_span_hex = to_otel_span_id(trace_obj.id)  # ルートスパン ID を決定論的に生成
    trace_start = min((o.start_time for o in observations), default=trace_obj.timestamp)
    trace_end = max(
        (o.end_time or o.start_time for o in observations), default=trace_obj.timestamp
    )

    # --- ルートスパン ---
    root_parent_sc = SpanContext(
        trace_id=int(new_trace_hex, 16),
        span_id=int(root_span_hex, 16),
        is_remote=True,
        trace_flags=TraceFlags(0x01),
    )
    # ルートスパンを作るときは、自身の SpanContext を親コンテキストとして渡すことで
    # trace_id を固定します。そして、その下に「ルートオブザベーションとしてふるまう」
    # 子スパンを作ります。こうしておくとネストのコードが一貫した書き方になります。
    root_ctx = otel_trace.set_span_in_context(NonRecordingSpan(root_parent_sc))
    root_span = migration_tracer.start_span(
        name=trace_obj.name or "trace",
        start_time=_to_ns(trace_start),
        context=root_ctx,
    )
    try:
        _set_trace_attrs(root_span, trace_obj)
        root_span.set_attribute("langfuse.observation.type", "span")
        # ルートスパンの span_id を押さえておき、親が None の (= 移行元でルートだった)
        # オブザベーションの子がこの下にネストされるようにします。
        actual_root_span_id_hex = format(root_span.get_span_context().span_id, "016x")

        for obs in observations:
            new_span_hex = to_otel_span_id(obs.id)
            obs_id_map[obs.id] = new_span_hex
            parent_hex = (
                obs_id_map.get(obs.parent_observation_id)
                if obs.parent_observation_id
                else actual_root_span_id_hex
            )
            child_ctx = _parent_ctx(new_trace_hex, parent_hex)
            child = migration_tracer.start_span(
                name=obs.name or obs.type.lower(),
                start_time=_to_ns(obs.start_time),
                context=child_ctx,
            )
            try:
                _set_observation_attrs(child, obs)
            finally:
                child.end(end_time=_to_ns(obs.end_time) or _to_ns(obs.start_time))
    finally:
        root_span.end(end_time=_to_ns(trace_end))

    return len(observations)


# --- 移行ループを実行 ---
page = 1
limit = 50
total = 0
total_obs = 0
failures = 0
progress_every = 10  # このトレース数ごとに進捗ログを出します
t_start = time.time()
while True:
    # trace.list に `fields="core,io"` を指定すると、一覧レスポンスに
    # input / output / metadata が含まれるので、トレースごとの追加リクエストが不要になります。
    listing = with_retries(lambda: src.api.trace.list(
        page=page,
        limit=limit,
        order_by="timestamp.asc",
        from_timestamp=FROM_TS,
        to_timestamp=TO_TS,
        fields="core,io",
    ))
    if not listing.data:
        break

    total_items = getattr(listing.meta, "total_items", None)
    total_pages = getattr(listing.meta, "total_pages", None)
    if page == 1 and total_items is not None:
        print(f"移行を開始します: {total_items} 件のトレース / {total_pages} ページ")

    for t in listing.data:
        try:
            obs_count = migrate_one_trace(t)
            total += 1
            total_obs += obs_count
            if total % progress_every == 0:
                elapsed = time.time() - t_start
                rate = total / elapsed if elapsed > 0 else 0
                progress = (
                    f"{total}/{total_items}" if total_items is not None else str(total)
                )
                print(
                    f"  [{progress} traces] observations={total_obs} "
                    f"failed={failures} rate={rate:.1f} traces/s"
                )
        except Exception as exc:
            print(f"  トレース {t.id} の移行に失敗: {exc}")
            failures += 1

    print(
        f"  ページ {page}/{total_pages or '?'} 完了 — "
        f"累計: traces={total}, observations={total_obs}, failed={failures}"
    )
    if total_pages is not None and page >= total_pages:
        break
    page += 1

otlp_provider.force_flush()
elapsed = time.time() - t_start
print(
    f"\nオブザベーション — 移行: {total} トレース / {total_obs} オブザベーション, "
    f"失敗: {failures}, 経過時間: {elapsed:.1f}s"
)
print(f"  trace_id_map エントリ数: {len(trace_id_map)}")
print(f"  obs_id_map エントリ数:   {len(obs_id_map)}")

5. スコア

スコアは移行元から api.scores.get_many で取得し、移行先ではネイティブの create_score ヘルパーで再作成します。元の id をそのまま保持するので、スクリプトを再実行しても冪等に動きます。config_id はセクション1の config_id_map で、trace_id / observation_id はセクション4の trace_id_map / obs_id_map でそれぞれマッピングし直します。

trace_id_map = globals().get("trace_id_map", {})
obs_id_map = globals().get("obs_id_map", {})
config_id_map = globals().get("config_id_map", {})

page = 1
migrated = failed = skipped = 0
while True:
    resp = with_retries(lambda: src.api.scores.get_many(
        page=page,
        limit=100,
        from_timestamp=FROM_TS,
        to_timestamp=TO_TS,
    ))
    if not resp.data:
        break
    for s in resp.data:
        # 参照をマッピングし直す
        new_trace_id = trace_id_map.get(s.trace_id, s.trace_id) if s.trace_id else None
        new_obs_id = obs_id_map.get(s.observation_id, s.observation_id) if s.observation_id else None
        new_cfg_id = config_id_map.get(s.config_id, s.config_id) if s.config_id else None

        # 型に応じた値を決める
        if s.data_type == "CATEGORICAL":
            value = getattr(s, "string_value", None) or str(s.value)
        else:
            value = s.value

        if value is None:
            skipped += 1
            continue

        try:
            with_retries(lambda: dst.create_score(
                name=s.name,
                value=value,
                data_type=s.data_type,
                trace_id=new_trace_id,
                observation_id=new_obs_id,
                session_id=getattr(s, "session_id", None),
                dataset_run_id=getattr(s, "dataset_run_id", None),
                score_id=s.id,  # ID を保持するので、再実行しても重複なく冪等に処理されます
                comment=s.comment,
                config_id=new_cfg_id,
                metadata=s.metadata,
                timestamp=s.timestamp,
            ))
            migrated += 1
        except Exception as exc:
            print(f"  スコア {s.id} ({s.name}) の移行に失敗: {exc}")
            failed += 1
    if page >= getattr(resp.meta, "total_pages", page):
        break
    page += 1

# スコアをフラッシュする (create_score は dst クライアントのキューにイベントを積むので)
dst.flush()
print(f"スコア — 移行: {migrated}, スキップ (値が null): {skipped}, 失敗: {failed}")

6. データセット (アイテム + 実行アイテム)

データセット本体はネイティブの create_dataset ヘルパーで、アイテムは create_dataset_item (移行元のアイテムIDを維持) で、実行アイテム (run items) は REST エンドポイント経由で移行します。実行アイテムからは、trace_id_map / obs_id_map を使って移行済みのトレース/オブザベーションを参照します。

# このセルだけ単体で実行する場合に備えて、マップが無ければ空で初期化します (スコアのセクション参照)。
trace_id_map = globals().get("trace_id_map", {})
obs_id_map = globals().get("obs_id_map", {})

from urllib.parse import quote as _urlquote


def _enc(name: str) -> str:
    return _urlquote(name, safe="")


ds_migrated = ds_skipped = ds_failed = 0
items_migrated = items_failed = 0
runs_created = runs_failed = 0

t_start = time.time()
ds_seen = 0  # 処理した移行元データセット数 (新規 + 既存)

page_ds = 1
while True:
    dsresp = with_retries(lambda: src.api.datasets.list(page=page_ds, limit=100))
    if not dsresp.data:
        break

    total_ds = getattr(dsresp.meta, "total_items", None)
    total_pages = getattr(dsresp.meta, "total_pages", None)
    if page_ds == 1 and total_ds is not None:
        print(f"データセットの移行を開始します: {total_ds} 件 / {total_pages} ページ")

    for ds in dsresp.data:
        ds_seen += 1
        progress = f"{ds_seen}/{total_ds}" if total_ds is not None else str(ds_seen)
        print(f"  [{progress}] '{ds.name}'")

        # データセットが無ければ作成します。フォルダ形式のデータセット (名前に "/" を含むもの)
        # の場合、パスパラメータは URL エンコードが必要です。
        try:
            dst.api.datasets.get(dataset_name=_enc(ds.name))
            ds_skipped += 1
            print(f"    データセットは移行先にすでに存在します — 作成をスキップします")
        except Exception as exc:
            if getattr(exc, "status_code", None) != 404:
                print(f"    データセットの取得に失敗: {exc}")
                ds_failed += 1
                continue
            try:
                with_retries(lambda: dst.create_dataset(
                    name=ds.name, description=ds.description, metadata=ds.metadata
                ))
                ds_migrated += 1
                print(f"    移行先にデータセットを作成しました")
            except Exception as exc2:
                print(f"    データセットの作成に失敗: {exc2}")
                ds_failed += 1
                continue

        # アイテム (実行アイテムから正しく参照できるよう、移行元の ID を維持します)
        ds_items_before = items_migrated
        ds_items_failed_before = items_failed
        page_item = 1
        while True:
            iresp = with_retries(lambda: src.api.dataset_items.list(
                dataset_name=ds.name, page=page_item, limit=100
            ))
            if not iresp.data:
                break
            for it in iresp.data:
                try:
                    with_retries(lambda: dst.create_dataset_item(
                        dataset_name=ds.name,
                        id=it.id,
                        input=it.input,
                        expected_output=it.expected_output,
                        metadata=it.metadata,
                        source_trace_id=trace_id_map.get(it.source_trace_id, it.source_trace_id) if it.source_trace_id else None,
                        source_observation_id=obs_id_map.get(it.source_observation_id, it.source_observation_id) if it.source_observation_id else None,
                        status=it.status,
                    ))
                    items_migrated += 1
                except Exception as exc:
                    print(f"      アイテム {it.id} の移行に失敗: {exc}")
                    items_failed += 1
            if page_item >= getattr(iresp.meta, "total_pages", page_item):
                break
            page_item += 1
        ds_items = items_migrated - ds_items_before
        ds_items_fail = items_failed - ds_items_failed_before
        print(f"    アイテム: 移行 {ds_items} 件, 失敗 {ds_items_fail} 件")

        # 実行アイテム。パスパラメータ (dataset_name, run_name) は URL エンコードが必要です。
        ds_runs_before = runs_created
        ds_runs_failed_before = runs_failed
        run_count = 0
        page_run = 1
        while True:
            rresp = with_retries(lambda: src.api.datasets.get_runs(
                dataset_name=_enc(ds.name), page=page_run, limit=100
            ))
            if not rresp.data:
                break
            for run_meta in rresp.data:
                run_count += 1
                try:
                    run_full = with_retries(lambda: src.api.datasets.get_run(
                        dataset_name=_enc(ds.name), run_name=_enc(run_meta.name)
                    ))
                except Exception as exc:
                    print(f"      実行 '{run_meta.name}' の取得に失敗: {exc}")
                    runs_failed += 1
                    continue
                for ri in run_full.dataset_run_items:
                    new_trace = trace_id_map.get(ri.trace_id, ri.trace_id) if ri.trace_id else None
                    new_obs = obs_id_map.get(ri.observation_id, ri.observation_id) if ri.observation_id else None
                    if not new_trace and not new_obs:
                        runs_failed += 1
                        continue
                    try:
                        with_retries(lambda: dst.api.dataset_run_items.create(
                            run_name=run_meta.name,
                            dataset_item_id=ri.dataset_item_id,
                            trace_id=new_trace,
                            observation_id=new_obs,
                            run_description=run_meta.description,
                            metadata=run_meta.metadata,
                        ))
                        runs_created += 1
                    except Exception as exc:
                        print(f"      実行 '{run_meta.name}' の実行アイテム作成に失敗: {exc}")
                        runs_failed += 1
            if page_run >= getattr(rresp.meta, "total_pages", page_run):
                break
            page_run += 1
        ds_runs = runs_created - ds_runs_before
        ds_runs_fail = runs_failed - ds_runs_failed_before
        print(
            f"    実行: {run_count} 件, 実行アイテムリンク {ds_runs} 件, "
            f"失敗 {ds_runs_fail} 件"
        )

    if page_ds >= getattr(dsresp.meta, "total_pages", page_ds):
        break
    page_ds += 1

elapsed = time.time() - t_start
print(
    f"\nデータセット — 新規: {ds_migrated}, 既存: {ds_skipped}, 失敗: {ds_failed}  |  "
    f"アイテム: 移行 {items_migrated} 件 / 失敗 {items_failed} 件  |  "
    f"実行アイテム: リンク {runs_created} 件 / 失敗 {runs_failed} 件  |  "
    f"経過時間: {elapsed:.1f}s"
)

Was this page helpful?