GuidesData Migration Script
GuidesCookbookData Migration Script
This is a Jupyter notebook

Migrating Data Between Langfuse Projects (Python SDK v4)

This notebook migrates data from one Langfuse project to another using the Langfuse Python SDK v4.

Common use cases:

  • Migrating between cloud regions (US ↔ EU ↔ JP ↔ HIPAA)
  • Migrating from a self-hosted Langfuse to Langfuse Cloud

What gets migrated

  1. Score Configs
  2. Custom Model Definitions
  3. Prompts (all versions)
  4. Observations (traces + nested observations, via the OTLP endpoint so original start_time / end_time are preserved)
  5. Scores
  6. Datasets (items + run items, linked to migrated traces/observations)

What is not migrated

The Langfuse public API does not currently support programmatic creation of: LLM-as-a-Judge evaluator configurations, custom dashboards, users / RBAC / SSO, and project/organization settings. These must be recreated manually or copied through UI-level exports.

0. Setup

%pip install --quiet "langfuse>=4.0.0" "opentelemetry-sdk>=1.25.0" "opentelemetry-exporter-otlp-proto-http>=1.25.0"
import os

# --- SOURCE project credentials ---
os.environ["LANGFUSE_SOURCE_PUBLIC_KEY"] = "pk-lf-.."
os.environ["LANGFUSE_SOURCE_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_SOURCE_HOST"] = "https://cloud.langfuse.com"  # e.g. US cloud

# --- DESTINATION project credentials ---
os.environ["LANGFUSE_DEST_PUBLIC_KEY"] = "pk-lf-.."
os.environ["LANGFUSE_DEST_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_DEST_HOST"] = "https://hipaa.cloud.langfuse.com"  # e.g. HIPAA cloud

# --- Optional time filter for the observations migration (ISO 8601) ---
# Leave empty to migrate all traces.
os.environ["LANGFUSE_MIGRATE_FROM_TIMESTAMP"] = "2026-03-01T00:00:00Z"  # e.g. "2025-01-01T00:00:00Z"
os.environ["LANGFUSE_MIGRATE_TO_TIMESTAMP"] = "2026-04-20T00:00:00Z"    # e.g. "2025-02-01T00:00:00Z"
import base64
import datetime as dt
import json
import re
import time
from hashlib import sha256
from typing import Any, Dict, List, Optional

from langfuse import Langfuse

SOURCE_CFG = {
    "public_key": os.environ["LANGFUSE_SOURCE_PUBLIC_KEY"],
    "secret_key": os.environ["LANGFUSE_SOURCE_SECRET_KEY"],
    "base_url": os.environ.get("LANGFUSE_SOURCE_HOST", "https://cloud.langfuse.com").rstrip("/"),
}
DEST_CFG = {
    "public_key": os.environ["LANGFUSE_DEST_PUBLIC_KEY"],
    "secret_key": os.environ["LANGFUSE_DEST_SECRET_KEY"],
    "base_url": os.environ.get("LANGFUSE_DEST_HOST", "https://cloud.langfuse.com").rstrip("/"),
}

# Sanity: source and destination must use different public keys. The Langfuse v4
# resource manager is a singleton keyed by public_key, so reusing a key would
# make the second client silently share the first client's base_url/secret_key.
assert SOURCE_CFG["public_key"] != DEST_CFG["public_key"], (
    "Source and destination must use different public keys. The Langfuse v4 "
    "resource manager is a singleton keyed by public_key; using the same key "
    "for both projects will cause the second client to reuse the first client's "
    "host and secret key."
)

# Defensively clear any leaked Langfuse env vars that would otherwise be picked
# up by the SDK constructor. In particular, `LANGFUSE_BASE_URL` takes priority
# over the `host=` argument; `LANGFUSE_PUBLIC_KEY` / `LANGFUSE_SECRET_KEY` are
# fallbacks. We always want the explicit per-project values above to win.
for _leak in (
    "LANGFUSE_BASE_URL",
    "LANGFUSE_HOST",
    "LANGFUSE_PUBLIC_KEY",
    "LANGFUSE_SECRET_KEY",
    "LANGFUSE_TRACING_ENVIRONMENT",
    "LANGFUSE_RELEASE",
):
    os.environ.pop(_leak, None)

# Two independent Langfuse clients. `tracing_enabled=False` disables the
# background OTel exporter on each client so they only act as REST + helper
# wrappers. The observations section sets up its own dedicated OTLP pipeline.
# Using `base_url=` (not `host=`) because `base_url` has the highest precedence
# in the Langfuse constructor and cannot be overridden by env vars.
# `timeout=60` overrides the SDK default of 5s — trace.get() on large traces
# with many nested observations routinely needs more than 5s to respond.
src = Langfuse(**SOURCE_CFG, tracing_enabled=False, timeout=60)
dst = Langfuse(**DEST_CFG, tracing_enabled=False, timeout=60)

assert src.auth_check(), "Source credentials invalid"
assert dst.auth_check(), "Destination credentials invalid"
print(f"Source:      {SOURCE_CFG['base_url']}")
print(f"Destination: {DEST_CFG['base_url']}")
# Shared utilities used across all sections.

_HEX32 = re.compile(r"^[0-9a-f]{32}$")
_HEX16 = re.compile(r"^[0-9a-f]{16}$")


def dumps(obj: Any) -> str:
    """UTF-8-safe JSON serialization (keeps Japanese / non-ASCII readable)."""
    return json.dumps(obj, ensure_ascii=False, default=str)


def to_otel_trace_id(source_trace_id: str) -> str:
    """Return a valid 32-hex-char Langfuse/OTel trace ID for a source ID.

    - If the source ID is already 32 lowercase hex chars, return it as-is so
      the original ID is preserved in the destination project.
    - Otherwise, deterministically derive one from the source ID (sha256) so
      re-runs always produce the same destination ID.
    """
    sid = source_trace_id.lower()
    if _HEX32.match(sid):
        return sid
    return sha256(sid.encode("utf-8")).hexdigest()[:32]


def to_otel_span_id(source_obs_id: str) -> str:
    """Return a valid 16-hex-char OTel span ID for a source observation ID."""
    sid = source_obs_id.lower()
    if _HEX16.match(sid):
        return sid
    return sha256(sid.encode("utf-8")).hexdigest()[:16]


import httpx

_RETRIABLE_CODES = {429, 500, 502, 503, 504}
# Transient network errors we want to retry on (no status_code attribute).
_RETRIABLE_EXCEPTIONS = (
    httpx.TimeoutException,          # includes ReadTimeout / ConnectTimeout / WriteTimeout / PoolTimeout
    httpx.RemoteProtocolError,       # server closed connection mid-response
    httpx.ConnectError,              # DNS/TCP level failures
    httpx.NetworkError,              # umbrella catch for socket-level errors
    TimeoutError,                    # stdlib asyncio / socket timeouts
    ConnectionError,                 # stdlib connection resets
)


def with_retries(fn, *, max_retries: int = 5, base_sleep: float = 0.5):
    """Exponential-backoff retry on rate limits / 5xx / transient network errors.

    Retries on (a) Fern-generated HTTP errors whose `status_code` attribute is
    in {429, 500, 502, 503, 504}, or (b) the transient-network exception
    classes listed in `_RETRIABLE_EXCEPTIONS`. Everything else is raised
    immediately.
    """
    for attempt in range(max_retries):
        try:
            return fn()
        except Exception as exc:
            status = getattr(exc, "status_code", None)
            retriable = status in _RETRIABLE_CODES or isinstance(exc, _RETRIABLE_EXCEPTIONS)
            if not retriable or attempt == max_retries - 1:
                raise
            sleep_for = base_sleep * (2 ** attempt)
            label = status if status is not None else type(exc).__name__
            print(f"    retriable {label} — sleeping {sleep_for:.1f}s")
            time.sleep(sleep_for)


def parse_ts(s: Optional[str]) -> Optional[dt.datetime]:
    if not s:
        return None
    if s.endswith("Z"):
        s = s[:-1] + "+00:00"
    d = dt.datetime.fromisoformat(s)
    return d.replace(tzinfo=dt.timezone.utc) if d.tzinfo is None else d


FROM_TS = parse_ts(os.environ.get("LANGFUSE_MIGRATE_FROM_TIMESTAMP") or None)
TO_TS = parse_ts(os.environ.get("LANGFUSE_MIGRATE_TO_TIMESTAMP") or None)
print(f"Time filter: from={FROM_TS}  to={TO_TS}")

1. Score Configs

Score configs must be migrated first so that scores and annotation queues created in the destination can reference them by ID. We keep a config_id_map to remap references in later sections.

from langfuse.api import ConfigCategory

config_id_map: Dict[str, str] = {}
existing_by_name = {c.name: c for c in dst.api.score_configs.get(limit=100).data}

page = 1
migrated = skipped = failed = 0
while True:
    resp = with_retries(lambda: src.api.score_configs.get(page=page, limit=100))
    if not resp.data:
        break
    for cfg in resp.data:
        if cfg.name in existing_by_name:
            config_id_map[cfg.id] = existing_by_name[cfg.name].id
            skipped += 1
            continue
        try:
            # The Langfuse API only accepts `categories` for CATEGORICAL configs
            # (BOOLEAN categories are auto-generated server-side) and only
            # accepts `min_value` / `max_value` for NUMERIC configs. Build the
            # kwargs conditionally so we don't send fields the server rejects.
            create_kwargs: Dict[str, Any] = {
                "name": cfg.name,
                "data_type": cfg.data_type,
                "description": cfg.description,
            }
            if cfg.data_type == "CATEGORICAL" and cfg.categories:
                create_kwargs["categories"] = [
                    ConfigCategory(label=c.label, value=c.value) for c in cfg.categories
                ]
            if cfg.data_type == "NUMERIC":
                if cfg.min_value is not None:
                    create_kwargs["min_value"] = cfg.min_value
                if cfg.max_value is not None:
                    create_kwargs["max_value"] = cfg.max_value
            created = with_retries(lambda: dst.api.score_configs.create(**create_kwargs))
            config_id_map[cfg.id] = created.id
            migrated += 1
        except Exception as exc:
            print(f"  failed to migrate score config '{cfg.name}': {exc}")
            failed += 1
    if page >= getattr(resp.meta, "total_pages", page):
        break
    page += 1

print(f"Score configs — migrated: {migrated}, skipped (already present): {skipped}, failed: {failed}")

2. Custom Model Definitions

Only user-defined models are migrated. Langfuse-managed models are skipped — they exist on every project by default.

page = 1
migrated = skipped = failed = 0
while True:
    resp = with_retries(lambda: src.api.models.list(page=page, limit=100))
    if not resp.data:
        break
    for m in resp.data:
        if getattr(m, "is_langfuse_managed", False):
            skipped += 1
            continue
        try:
            # The server rejects requests that mix flat prices with
            # pricing_tiers. If the source uses pricing_tiers (e.g. token-
            # tiered models like gpt-image-1), forward them and omit the
            # flat prices; otherwise send only the flat prices.
            create_kwargs: Dict[str, Any] = {
                "model_name": m.model_name,
                "match_pattern": m.match_pattern,
                "start_date": m.start_date,
                "unit": m.unit,
                "tokenizer_id": m.tokenizer_id,
                "tokenizer_config": m.tokenizer_config,
            }
            if getattr(m, "pricing_tiers", None):
                create_kwargs["pricing_tiers"] = m.pricing_tiers
            else:
                if m.input_price is not None:
                    create_kwargs["input_price"] = m.input_price
                if m.output_price is not None:
                    create_kwargs["output_price"] = m.output_price
                if m.total_price is not None:
                    create_kwargs["total_price"] = m.total_price
            with_retries(lambda: dst.api.models.create(**create_kwargs))
            migrated += 1
        except Exception as exc:
            if getattr(exc, "status_code", None) == 409 or "already exists" in str(exc).lower():
                skipped += 1
            else:
                print(f"  failed to migrate model '{m.model_name}': {exc}")
                failed += 1
    if page >= getattr(resp.meta, "total_pages", page):
        break
    page += 1

print(f"Custom models — migrated: {migrated}, skipped: {skipped}, failed: {failed}")

3. Prompts

Every version of every prompt is migrated. The destination assigns fresh version numbers; the original source version is recorded in commit_message so it stays traceable. Tags and labels are preserved as-is.

from urllib.parse import quote

from langfuse.api import (
    CreateChatPromptRequest,
    CreateChatPromptType,
    CreateTextPromptRequest,
    CreateTextPromptType,
    Prompt_Chat,
    Prompt_Text,
)

page = 1
prompt_versions = []  # (name, version) tuples
while True:
    resp = with_retries(lambda: src.api.prompts.list(page=page, limit=100))
    if not resp.data:
        break
    for meta in resp.data:
        for v in meta.versions:
            prompt_versions.append((meta.name, v))
    if page >= getattr(resp.meta, "total_pages", page):
        break
    page += 1

prompt_versions.sort(key=lambda x: (x[0], x[1]))
print(f"Found {len(prompt_versions)} prompt versions on source")

migrated = failed = 0
for name, version in prompt_versions:
    try:
        # Folder prompts (names containing '/') must have their path segments
        # URL-encoded on the GET endpoint — the Fern client does not do this
        # automatically. The request body on create() keeps the raw name.
        encoded_name = quote(name, safe="")
        p = with_retries(lambda: src.api.prompts.get(encoded_name, version=version))
        commit_msg = f"Migrated from source. Original version: {version}."
        if isinstance(p, Prompt_Chat):
            req = CreateChatPromptRequest(
                name=p.name,
                type=CreateChatPromptType.CHAT,
                prompt=p.prompt,
                labels=p.labels or [],
                tags=p.tags or [],
                config=p.config,
                commit_message=commit_msg,
            )
        elif isinstance(p, Prompt_Text):
            req = CreateTextPromptRequest(
                name=p.name,
                type=CreateTextPromptType.TEXT,
                prompt=p.prompt,
                labels=p.labels or [],
                tags=p.tags or [],
                config=p.config,
                commit_message=commit_msg,
            )
        else:
            print(f"  unknown prompt type for {name} v{version}: {type(p).__name__}")
            failed += 1
            continue
        with_retries(lambda: dst.api.prompts.create(request=req))
        migrated += 1
    except Exception as exc:
        print(f"  failed {name} v{version}: {exc}")
        failed += 1

print(f"Prompts — migrated: {migrated}, failed: {failed}")

4. Observations (via OTLP ingestion endpoint)

This is the core of the migration. We push each source trace (and its nested observations) into the destination's OTLP ingestion endpoint at /api/public/otel/v1/traces. The OTLP endpoint is the only way to set the authoritative start_time / end_time of an observation at ingest — the legacy REST ingestion rewrites those fields.

Self-hosted deployments: the v2 Observations API is currently Cloud-only. If you are migrating from a self-hosted Langfuse instance, replace the observations.get_many(...) calls below with src.api.trace.get(trace.id).observations (which hits the v1 endpoint). A Langfuse self-hosted migration path for v2 is on the roadmap.

from opentelemetry import trace as otel_trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags

# --- Build a dedicated OTLP pipeline that points at the destination project ---
_auth = base64.b64encode(
    f"{DEST_CFG['public_key']}:{DEST_CFG['secret_key']}".encode("utf-8")
).decode("ascii")

otlp_provider = TracerProvider(resource=Resource.create({"service.name": "langfuse-migration"}))
otlp_provider.add_span_processor(
    SimpleSpanProcessor(
        OTLPSpanExporter(
            endpoint=f"{DEST_CFG['base_url']}/api/public/otel/v1/traces",
            headers={
                "Authorization": f"Basic {_auth}",
                "x-langfuse-ingestion-version": "4",
            },
        )
    )
)
migration_tracer = otlp_provider.get_tracer("langfuse-migration")


def _to_ns(d: Optional[dt.datetime]) -> Optional[int]:
    if d is None:
        return None
    if d.tzinfo is None:
        d = d.replace(tzinfo=dt.timezone.utc)
    return int(d.timestamp() * 1_000_000_000)


def _set_scalar(span, key: str, value: Any) -> None:
    """Set an OTel attribute on `span` stringifying non-primitive values via UTF-8 JSON."""
    if value is None:
        return
    if isinstance(value, (str, bool, int, float)):
        span.set_attribute(key, value)
        return
    span.set_attribute(key, dumps(value))


def _as_attr_string(value: Any) -> Optional[str]:
    """Coerce an input/output/metadata value to a JSON attribute string.

    The Observations API v2 returns input/output/metadata as raw JSON strings,
    while the Trace listing returns them as parsed dicts/lists. Pass strings
    through unchanged so we don't end up double-encoded; JSON-encode
    everything else with ensure_ascii=False (Japanese-safe).
    """
    if value is None:
        return None
    if isinstance(value, str):
        return value
    return dumps(value)


def _set_trace_attrs(span, trace_obj) -> None:
    """Set trace-level langfuse.* attributes on the root span."""
    _set_scalar(span, "langfuse.trace.name", trace_obj.name)
    _set_scalar(span, "langfuse.user.id", trace_obj.user_id)
    _set_scalar(span, "langfuse.session.id", trace_obj.session_id)
    _set_scalar(span, "langfuse.release", trace_obj.release)
    _set_scalar(span, "langfuse.version", trace_obj.version)
    _set_scalar(span, "langfuse.environment", trace_obj.environment or "default")
    if trace_obj.public is not None:
        span.set_attribute("langfuse.trace.public", bool(trace_obj.public))
    if trace_obj.tags:
        # Langfuse accepts both a string[] attribute and a JSON string.
        span.set_attribute("langfuse.trace.tags", list(trace_obj.tags))
    input_str = _as_attr_string(trace_obj.input)
    if input_str is not None:
        span.set_attribute("langfuse.trace.input", input_str)
    output_str = _as_attr_string(trace_obj.output)
    if output_str is not None:
        span.set_attribute("langfuse.trace.output", output_str)
    md = trace_obj.metadata
    if isinstance(md, dict):
        for k, v in md.items():
            _set_scalar(span, f"langfuse.trace.metadata.{k}", v)
    elif md is not None:
        _set_scalar(span, "langfuse.trace.metadata._raw", md)


_OBS_TYPE_MAP = {
    "SPAN": "span",
    "GENERATION": "generation",
    "EVENT": "event",
    "AGENT": "span",
    "TOOL": "span",
    "CHAIN": "span",
    "RETRIEVER": "span",
    "EVALUATOR": "span",
    "EMBEDDING": "generation",
    "GUARDRAIL": "span",
}


def _set_observation_attrs(span, obs) -> None:
    obs_type = _OBS_TYPE_MAP.get(obs.type, "span")
    span.set_attribute("langfuse.observation.type", obs_type)
    _set_scalar(span, "langfuse.observation.level", obs.level)
    _set_scalar(span, "langfuse.observation.status_message", obs.status_message)
    _set_scalar(span, "langfuse.version", obs.version)
    _set_scalar(span, "langfuse.environment", obs.environment or "default")
    input_str = _as_attr_string(obs.input)
    if input_str is not None:
        span.set_attribute("langfuse.observation.input", input_str)
    output_str = _as_attr_string(obs.output)
    if output_str is not None:
        span.set_attribute("langfuse.observation.output", output_str)
    md = obs.metadata
    if isinstance(md, dict):
        for k, v in md.items():
            _set_scalar(span, f"langfuse.observation.metadata.{k}", v)
    elif md is not None:
        _set_scalar(span, "langfuse.observation.metadata._raw", md)
    # Generation-only attributes. ObservationV2 renames `model` to
    # `provided_model_name`; fall back to `model` for the v1 type used by
    # self-hosted migrations.
    if obs_type == "generation":
        model_name = getattr(obs, "provided_model_name", None) or getattr(obs, "model", None)
        _set_scalar(span, "langfuse.observation.model.name", model_name)
        if obs.model_parameters:
            span.set_attribute("langfuse.observation.model.parameters", dumps(obs.model_parameters))
        if obs.usage_details:
            span.set_attribute("langfuse.observation.usage_details", dumps(obs.usage_details))
        if obs.cost_details:
            span.set_attribute("langfuse.observation.cost_details", dumps(obs.cost_details))
        if obs.completion_start_time is not None:
            d = obs.completion_start_time
            if d.tzinfo is None:
                d = d.replace(tzinfo=dt.timezone.utc)
            span.set_attribute(
                "langfuse.observation.completion_start_time",
                d.isoformat().replace("+00:00", "Z"),
            )
        if getattr(obs, "prompt_name", None):
            _set_scalar(span, "langfuse.observation.prompt.name", obs.prompt_name)
        if getattr(obs, "prompt_version", None) is not None:
            span.set_attribute("langfuse.observation.prompt.version", int(obs.prompt_version))


def _parent_ctx(trace_id_hex: str, parent_span_id_hex: Optional[str]):
    if not parent_span_id_hex:
        return None
    parent_sc = SpanContext(
        trace_id=int(trace_id_hex, 16),
        span_id=int(parent_span_id_hex, 16),
        is_remote=True,
        trace_flags=TraceFlags(0x01),
    )
    return otel_trace.set_span_in_context(NonRecordingSpan(parent_sc))


trace_id_map: Dict[str, str] = {}
obs_id_map: Dict[str, str] = {}

# Field groups we need from the v2 Observations API. `core` is always included.
# `basic` gives name/level/status/version/environment; `time` adds
# completionStartTime; `io` adds input/output; `metadata` adds metadata;
# `model` adds providedModelName/modelParameters; `usage` adds
# usageDetails/costDetails; `prompt` adds promptName/promptVersion.
OBSERVATIONS_FIELDS = "core,basic,time,io,metadata,model,usage,prompt"


def _fetch_observations_v2(source_trace_id: str) -> List[Any]:
    """Fetch all observations for a trace via the v2 API with cursor pagination.

    Cloud-only. For self-hosted migrations, replace this helper with:

        return with_retries(lambda: src.api.trace.get(source_trace_id)).observations
    """
    results: List[Any] = []
    cursor: Optional[str] = None
    while True:
        resp = with_retries(lambda: src.api.observations.get_many(
            trace_id=source_trace_id,
            fields=OBSERVATIONS_FIELDS,
            limit=1000,
            cursor=cursor,
        ))
        if resp.data:
            results.extend(resp.data)
        cursor = getattr(resp.meta, "cursor", None)
        if not cursor:
            break
    return results


def migrate_one_trace(trace_obj) -> int:
    """Migrate one trace + its observations. Returns the number of child observations migrated."""
    observations = _fetch_observations_v2(trace_obj.id)
    new_trace_hex = to_otel_trace_id(trace_obj.id)
    trace_id_map[trace_obj.id] = new_trace_hex

    # Sort observations: parents before children, earliest first.
    observations.sort(key=lambda o: (o.start_time, o.id))

    # Root span carrying trace-level attributes. If the source has no explicit
    # root observation we synthesize one with the trace's own timestamps.
    root_span_hex = to_otel_span_id(trace_obj.id)  # deterministic root span id
    trace_start = min((o.start_time for o in observations), default=trace_obj.timestamp)
    trace_end = max(
        (o.end_time or o.start_time for o in observations), default=trace_obj.timestamp
    )

    # --- Root span ---
    root_parent_sc = SpanContext(
        trace_id=int(new_trace_hex, 16),
        span_id=int(root_span_hex, 16),
        is_remote=True,
        trace_flags=TraceFlags(0x01),
    )
    # Root span is created with its own SpanContext as parent context so the
    # trace_id is forced. We then create a child span underneath it that
    # *acts* as the root observation. This keeps the code uniform for nesting.
    root_ctx = otel_trace.set_span_in_context(NonRecordingSpan(root_parent_sc))
    root_span = migration_tracer.start_span(
        name=trace_obj.name or "trace",
        start_time=_to_ns(trace_start),
        context=root_ctx,
    )
    try:
        _set_trace_attrs(root_span, trace_obj)
        root_span.set_attribute("langfuse.observation.type", "span")
        # Reserve the root span's span_id so children under source-root
        # observations (parent_observation_id is None) nest beneath it.
        actual_root_span_id_hex = format(root_span.get_span_context().span_id, "016x")

        for obs in observations:
            new_span_hex = to_otel_span_id(obs.id)
            obs_id_map[obs.id] = new_span_hex
            parent_hex = (
                obs_id_map.get(obs.parent_observation_id)
                if obs.parent_observation_id
                else actual_root_span_id_hex
            )
            child_ctx = _parent_ctx(new_trace_hex, parent_hex)
            child = migration_tracer.start_span(
                name=obs.name or obs.type.lower(),
                start_time=_to_ns(obs.start_time),
                context=child_ctx,
            )
            try:
                _set_observation_attrs(child, obs)
            finally:
                child.end(end_time=_to_ns(obs.end_time) or _to_ns(obs.start_time))
    finally:
        root_span.end(end_time=_to_ns(trace_end))

    return len(observations)


# --- Drive the migration ---
page = 1
limit = 50
total = 0
total_obs = 0
failures = 0
progress_every = 10  # print a progress line every N traces
t_start = time.time()
while True:
    # `fields="core,io"` on trace.list ensures we get input/output/metadata
    # in the listing response and don't need a second call per trace.
    listing = with_retries(lambda: src.api.trace.list(
        page=page,
        limit=limit,
        order_by="timestamp.asc",
        from_timestamp=FROM_TS,
        to_timestamp=TO_TS,
        fields="core,io",
    ))
    if not listing.data:
        break

    total_items = getattr(listing.meta, "total_items", None)
    total_pages = getattr(listing.meta, "total_pages", None)
    if page == 1 and total_items is not None:
        print(f"Starting migration: {total_items} trace(s) across {total_pages} page(s)")

    for t in listing.data:
        try:
            obs_count = migrate_one_trace(t)
            total += 1
            total_obs += obs_count
            if total % progress_every == 0:
                elapsed = time.time() - t_start
                rate = total / elapsed if elapsed > 0 else 0
                progress = (
                    f"{total}/{total_items}" if total_items is not None else str(total)
                )
                print(
                    f"  [{progress} traces] observations={total_obs} "
                    f"failed={failures} rate={rate:.1f} traces/s"
                )
        except Exception as exc:
            print(f"  failed trace {t.id}: {exc}")
            failures += 1

    print(
        f"  page {page}/{total_pages or '?'} done — "
        f"cumulative: traces={total}, observations={total_obs}, failed={failures}"
    )
    if total_pages is not None and page >= total_pages:
        break
    page += 1

otlp_provider.force_flush()
elapsed = time.time() - t_start
print(
    f"\nObservations — migrated: {total} traces / {total_obs} observations, "
    f"failed: {failures}, elapsed: {elapsed:.1f}s"
)
print(f"  trace_id_map entries: {len(trace_id_map)}")
print(f"  obs_id_map entries:   {len(obs_id_map)}")

5. Scores

Scores are fetched from the source via api.scores.get_many and re-created on the destination via the native create_score helper. Original id is preserved so re-runs are idempotent. config_id is remapped through config_id_map from Section 1; trace_id / observation_id are remapped through trace_id_map / obs_id_map from Section 4.

trace_id_map = globals().get("trace_id_map", {})
obs_id_map = globals().get("obs_id_map", {})
config_id_map = globals().get("config_id_map", {})

page = 1
migrated = failed = skipped = 0
while True:
    resp = with_retries(lambda: src.api.scores.get_many(
        page=page,
        limit=100,
        from_timestamp=FROM_TS,
        to_timestamp=TO_TS,
    ))
    if not resp.data:
        break
    for s in resp.data:
        # Remap references
        new_trace_id = trace_id_map.get(s.trace_id, s.trace_id) if s.trace_id else None
        new_obs_id = obs_id_map.get(s.observation_id, s.observation_id) if s.observation_id else None
        new_cfg_id = config_id_map.get(s.config_id, s.config_id) if s.config_id else None

        # Determine typed value
        if s.data_type == "CATEGORICAL":
            value = getattr(s, "string_value", None) or str(s.value)
        else:
            value = s.value

        if value is None:
            skipped += 1
            continue

        try:
            with_retries(lambda: dst.create_score(
                name=s.name,
                value=value,
                data_type=s.data_type,
                trace_id=new_trace_id,
                observation_id=new_obs_id,
                session_id=getattr(s, "session_id", None),
                dataset_run_id=getattr(s, "dataset_run_id", None),
                score_id=s.id,  # preserves ID for idempotent re-runs
                comment=s.comment,
                config_id=new_cfg_id,
                metadata=s.metadata,
                timestamp=s.timestamp,
            ))
            migrated += 1
        except Exception as exc:
            print(f"  failed score {s.id} ({s.name}): {exc}")
            failed += 1
    if page >= getattr(resp.meta, "total_pages", page):
        break
    page += 1

# Flush scores (create_score queues events on the dst client)
dst.flush()
print(f"Scores — migrated: {migrated}, skipped (null value): {skipped}, failed: {failed}")

6. Datasets (items + run items)

We migrate datasets themselves via the native create_dataset helper, items via create_dataset_item (preserving source item IDs), and run items via the REST endpoint. Dataset run items reference migrated traces/observations using trace_id_map / obs_id_map.

# Fallback to empty maps when this cell is run standalone (see Scores section).
trace_id_map = globals().get("trace_id_map", {})
obs_id_map = globals().get("obs_id_map", {})

from urllib.parse import quote as _urlquote


def _enc(name: str) -> str:
    return _urlquote(name, safe="")


ds_migrated = ds_skipped = ds_failed = 0
items_migrated = items_failed = 0
runs_created = runs_failed = 0

t_start = time.time()
ds_seen = 0  # count of source datasets processed (new + existing)

page_ds = 1
while True:
    dsresp = with_retries(lambda: src.api.datasets.list(page=page_ds, limit=100))
    if not dsresp.data:
        break

    total_ds = getattr(dsresp.meta, "total_items", None)
    total_pages = getattr(dsresp.meta, "total_pages", None)
    if page_ds == 1 and total_ds is not None:
        print(f"Starting dataset migration: {total_ds} dataset(s) across {total_pages} page(s)")

    for ds in dsresp.data:
        ds_seen += 1
        progress = f"{ds_seen}/{total_ds}" if total_ds is not None else str(ds_seen)
        print(f"  [{progress}] '{ds.name}'")

        # Create dataset if not present. Path parameters must be URL-encoded
        # for folder datasets (names containing "/").
        try:
            dst.api.datasets.get(dataset_name=_enc(ds.name))
            ds_skipped += 1
            print(f"    dataset already exists in destination — skipping create")
        except Exception as exc:
            if getattr(exc, "status_code", None) != 404:
                print(f"    failed dataset lookup: {exc}")
                ds_failed += 1
                continue
            try:
                with_retries(lambda: dst.create_dataset(
                    name=ds.name, description=ds.description, metadata=ds.metadata
                ))
                ds_migrated += 1
                print(f"    created dataset in destination")
            except Exception as exc2:
                print(f"    failed to create dataset: {exc2}")
                ds_failed += 1
                continue

        # Items (preserve source IDs so run items link correctly)
        ds_items_before = items_migrated
        ds_items_failed_before = items_failed
        page_item = 1
        while True:
            iresp = with_retries(lambda: src.api.dataset_items.list(
                dataset_name=ds.name, page=page_item, limit=100
            ))
            if not iresp.data:
                break
            for it in iresp.data:
                try:
                    with_retries(lambda: dst.create_dataset_item(
                        dataset_name=ds.name,
                        id=it.id,
                        input=it.input,
                        expected_output=it.expected_output,
                        metadata=it.metadata,
                        source_trace_id=trace_id_map.get(it.source_trace_id, it.source_trace_id) if it.source_trace_id else None,
                        source_observation_id=obs_id_map.get(it.source_observation_id, it.source_observation_id) if it.source_observation_id else None,
                        status=it.status,
                    ))
                    items_migrated += 1
                except Exception as exc:
                    print(f"      failed item {it.id}: {exc}")
                    items_failed += 1
            if page_item >= getattr(iresp.meta, "total_pages", page_item):
                break
            page_item += 1
        ds_items = items_migrated - ds_items_before
        ds_items_fail = items_failed - ds_items_failed_before
        print(f"    items: {ds_items} migrated, {ds_items_fail} failed")

        # Run items. Path params (dataset_name, run_name) need URL-encoding.
        ds_runs_before = runs_created
        ds_runs_failed_before = runs_failed
        run_count = 0
        page_run = 1
        while True:
            rresp = with_retries(lambda: src.api.datasets.get_runs(
                dataset_name=_enc(ds.name), page=page_run, limit=100
            ))
            if not rresp.data:
                break
            for run_meta in rresp.data:
                run_count += 1
                try:
                    run_full = with_retries(lambda: src.api.datasets.get_run(
                        dataset_name=_enc(ds.name), run_name=_enc(run_meta.name)
                    ))
                except Exception as exc:
                    print(f"      failed run fetch '{run_meta.name}': {exc}")
                    runs_failed += 1
                    continue
                for ri in run_full.dataset_run_items:
                    new_trace = trace_id_map.get(ri.trace_id, ri.trace_id) if ri.trace_id else None
                    new_obs = obs_id_map.get(ri.observation_id, ri.observation_id) if ri.observation_id else None
                    if not new_trace and not new_obs:
                        runs_failed += 1
                        continue
                    try:
                        with_retries(lambda: dst.api.dataset_run_items.create(
                            run_name=run_meta.name,
                            dataset_item_id=ri.dataset_item_id,
                            trace_id=new_trace,
                            observation_id=new_obs,
                            run_description=run_meta.description,
                            metadata=run_meta.metadata,
                        ))
                        runs_created += 1
                    except Exception as exc:
                        print(f"      failed run item for run '{run_meta.name}': {exc}")
                        runs_failed += 1
            if page_run >= getattr(rresp.meta, "total_pages", page_run):
                break
            page_run += 1
        ds_runs = runs_created - ds_runs_before
        ds_runs_fail = runs_failed - ds_runs_failed_before
        print(
            f"    runs: {run_count} run(s), {ds_runs} run item(s) linked, "
            f"{ds_runs_fail} failed"
        )

    if page_ds >= getattr(dsresp.meta, "total_pages", page_ds):
        break
    page_ds += 1

elapsed = time.time() - t_start
print(
    f"\nDatasets — new: {ds_migrated}, existing: {ds_skipped}, failed: {ds_failed}  |  "
    f"items: {items_migrated} migrated / {items_failed} failed  |  "
    f"run items: {runs_created} linked / {runs_failed} failed  |  "
    f"elapsed: {elapsed:.1f}s"
)

Was this page helpful?