Migrating Data Between Langfuse Projects (Python SDK v4)
This notebook migrates data from one Langfuse project to another using the Langfuse Python SDK v4.
Common use cases:
- Migrating between cloud regions (US ↔ EU ↔ JP ↔ HIPAA)
- Migrating from a self-hosted Langfuse to Langfuse Cloud
What gets migrated
- Score Configs
- Custom Model Definitions
- Prompts (all versions)
- Observations (traces + nested observations, via the OTLP endpoint so original
start_time/end_timeare preserved) - Scores
- Datasets (items + run items, linked to migrated traces/observations)
What is not migrated
The Langfuse public API does not currently support programmatic creation of: LLM-as-a-Judge evaluator configurations, custom dashboards, users / RBAC / SSO, and project/organization settings. These must be recreated manually or copied through UI-level exports.
0. Setup
%pip install --quiet "langfuse>=4.0.0" "opentelemetry-sdk>=1.25.0" "opentelemetry-exporter-otlp-proto-http>=1.25.0"import os
# --- SOURCE project credentials ---
os.environ["LANGFUSE_SOURCE_PUBLIC_KEY"] = "pk-lf-.."
os.environ["LANGFUSE_SOURCE_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_SOURCE_HOST"] = "https://cloud.langfuse.com" # e.g. US cloud
# --- DESTINATION project credentials ---
os.environ["LANGFUSE_DEST_PUBLIC_KEY"] = "pk-lf-.."
os.environ["LANGFUSE_DEST_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_DEST_HOST"] = "https://hipaa.cloud.langfuse.com" # e.g. HIPAA cloud
# --- Optional time filter for the observations migration (ISO 8601) ---
# Leave empty to migrate all traces.
os.environ["LANGFUSE_MIGRATE_FROM_TIMESTAMP"] = "2026-03-01T00:00:00Z" # e.g. "2025-01-01T00:00:00Z"
os.environ["LANGFUSE_MIGRATE_TO_TIMESTAMP"] = "2026-04-20T00:00:00Z" # e.g. "2025-02-01T00:00:00Z"import base64
import datetime as dt
import json
import re
import time
from hashlib import sha256
from typing import Any, Dict, List, Optional
from langfuse import Langfuse
SOURCE_CFG = {
"public_key": os.environ["LANGFUSE_SOURCE_PUBLIC_KEY"],
"secret_key": os.environ["LANGFUSE_SOURCE_SECRET_KEY"],
"base_url": os.environ.get("LANGFUSE_SOURCE_HOST", "https://cloud.langfuse.com").rstrip("/"),
}
DEST_CFG = {
"public_key": os.environ["LANGFUSE_DEST_PUBLIC_KEY"],
"secret_key": os.environ["LANGFUSE_DEST_SECRET_KEY"],
"base_url": os.environ.get("LANGFUSE_DEST_HOST", "https://cloud.langfuse.com").rstrip("/"),
}
# Sanity: source and destination must use different public keys. The Langfuse v4
# resource manager is a singleton keyed by public_key, so reusing a key would
# make the second client silently share the first client's base_url/secret_key.
assert SOURCE_CFG["public_key"] != DEST_CFG["public_key"], (
"Source and destination must use different public keys. The Langfuse v4 "
"resource manager is a singleton keyed by public_key; using the same key "
"for both projects will cause the second client to reuse the first client's "
"host and secret key."
)
# Defensively clear any leaked Langfuse env vars that would otherwise be picked
# up by the SDK constructor. In particular, `LANGFUSE_BASE_URL` takes priority
# over the `host=` argument; `LANGFUSE_PUBLIC_KEY` / `LANGFUSE_SECRET_KEY` are
# fallbacks. We always want the explicit per-project values above to win.
for _leak in (
"LANGFUSE_BASE_URL",
"LANGFUSE_HOST",
"LANGFUSE_PUBLIC_KEY",
"LANGFUSE_SECRET_KEY",
"LANGFUSE_TRACING_ENVIRONMENT",
"LANGFUSE_RELEASE",
):
os.environ.pop(_leak, None)
# Two independent Langfuse clients. `tracing_enabled=False` disables the
# background OTel exporter on each client so they only act as REST + helper
# wrappers. The observations section sets up its own dedicated OTLP pipeline.
# Using `base_url=` (not `host=`) because `base_url` has the highest precedence
# in the Langfuse constructor and cannot be overridden by env vars.
# `timeout=60` overrides the SDK default of 5s — trace.get() on large traces
# with many nested observations routinely needs more than 5s to respond.
src = Langfuse(**SOURCE_CFG, tracing_enabled=False, timeout=60)
dst = Langfuse(**DEST_CFG, tracing_enabled=False, timeout=60)
assert src.auth_check(), "Source credentials invalid"
assert dst.auth_check(), "Destination credentials invalid"
print(f"Source: {SOURCE_CFG['base_url']}")
print(f"Destination: {DEST_CFG['base_url']}")# Shared utilities used across all sections.
_HEX32 = re.compile(r"^[0-9a-f]{32}$")
_HEX16 = re.compile(r"^[0-9a-f]{16}$")
def dumps(obj: Any) -> str:
"""UTF-8-safe JSON serialization (keeps Japanese / non-ASCII readable)."""
return json.dumps(obj, ensure_ascii=False, default=str)
def to_otel_trace_id(source_trace_id: str) -> str:
"""Return a valid 32-hex-char Langfuse/OTel trace ID for a source ID.
- If the source ID is already 32 lowercase hex chars, return it as-is so
the original ID is preserved in the destination project.
- Otherwise, deterministically derive one from the source ID (sha256) so
re-runs always produce the same destination ID.
"""
sid = source_trace_id.lower()
if _HEX32.match(sid):
return sid
return sha256(sid.encode("utf-8")).hexdigest()[:32]
def to_otel_span_id(source_obs_id: str) -> str:
"""Return a valid 16-hex-char OTel span ID for a source observation ID."""
sid = source_obs_id.lower()
if _HEX16.match(sid):
return sid
return sha256(sid.encode("utf-8")).hexdigest()[:16]
import httpx
_RETRIABLE_CODES = {429, 500, 502, 503, 504}
# Transient network errors we want to retry on (no status_code attribute).
_RETRIABLE_EXCEPTIONS = (
httpx.TimeoutException, # includes ReadTimeout / ConnectTimeout / WriteTimeout / PoolTimeout
httpx.RemoteProtocolError, # server closed connection mid-response
httpx.ConnectError, # DNS/TCP level failures
httpx.NetworkError, # umbrella catch for socket-level errors
TimeoutError, # stdlib asyncio / socket timeouts
ConnectionError, # stdlib connection resets
)
def with_retries(fn, *, max_retries: int = 5, base_sleep: float = 0.5):
"""Exponential-backoff retry on rate limits / 5xx / transient network errors.
Retries on (a) Fern-generated HTTP errors whose `status_code` attribute is
in {429, 500, 502, 503, 504}, or (b) the transient-network exception
classes listed in `_RETRIABLE_EXCEPTIONS`. Everything else is raised
immediately.
"""
for attempt in range(max_retries):
try:
return fn()
except Exception as exc:
status = getattr(exc, "status_code", None)
retriable = status in _RETRIABLE_CODES or isinstance(exc, _RETRIABLE_EXCEPTIONS)
if not retriable or attempt == max_retries - 1:
raise
sleep_for = base_sleep * (2 ** attempt)
label = status if status is not None else type(exc).__name__
print(f" retriable {label} — sleeping {sleep_for:.1f}s")
time.sleep(sleep_for)
def parse_ts(s: Optional[str]) -> Optional[dt.datetime]:
if not s:
return None
if s.endswith("Z"):
s = s[:-1] + "+00:00"
d = dt.datetime.fromisoformat(s)
return d.replace(tzinfo=dt.timezone.utc) if d.tzinfo is None else d
FROM_TS = parse_ts(os.environ.get("LANGFUSE_MIGRATE_FROM_TIMESTAMP") or None)
TO_TS = parse_ts(os.environ.get("LANGFUSE_MIGRATE_TO_TIMESTAMP") or None)
print(f"Time filter: from={FROM_TS} to={TO_TS}")1. Score Configs
Score configs must be migrated first so that scores and annotation queues created in the destination can reference them by ID. We keep a config_id_map to remap references in later sections.
from langfuse.api import ConfigCategory
config_id_map: Dict[str, str] = {}
existing_by_name = {c.name: c for c in dst.api.score_configs.get(limit=100).data}
page = 1
migrated = skipped = failed = 0
while True:
resp = with_retries(lambda: src.api.score_configs.get(page=page, limit=100))
if not resp.data:
break
for cfg in resp.data:
if cfg.name in existing_by_name:
config_id_map[cfg.id] = existing_by_name[cfg.name].id
skipped += 1
continue
try:
# The Langfuse API only accepts `categories` for CATEGORICAL configs
# (BOOLEAN categories are auto-generated server-side) and only
# accepts `min_value` / `max_value` for NUMERIC configs. Build the
# kwargs conditionally so we don't send fields the server rejects.
create_kwargs: Dict[str, Any] = {
"name": cfg.name,
"data_type": cfg.data_type,
"description": cfg.description,
}
if cfg.data_type == "CATEGORICAL" and cfg.categories:
create_kwargs["categories"] = [
ConfigCategory(label=c.label, value=c.value) for c in cfg.categories
]
if cfg.data_type == "NUMERIC":
if cfg.min_value is not None:
create_kwargs["min_value"] = cfg.min_value
if cfg.max_value is not None:
create_kwargs["max_value"] = cfg.max_value
created = with_retries(lambda: dst.api.score_configs.create(**create_kwargs))
config_id_map[cfg.id] = created.id
migrated += 1
except Exception as exc:
print(f" failed to migrate score config '{cfg.name}': {exc}")
failed += 1
if page >= getattr(resp.meta, "total_pages", page):
break
page += 1
print(f"Score configs — migrated: {migrated}, skipped (already present): {skipped}, failed: {failed}")2. Custom Model Definitions
Only user-defined models are migrated. Langfuse-managed models are skipped — they exist on every project by default.
page = 1
migrated = skipped = failed = 0
while True:
resp = with_retries(lambda: src.api.models.list(page=page, limit=100))
if not resp.data:
break
for m in resp.data:
if getattr(m, "is_langfuse_managed", False):
skipped += 1
continue
try:
# The server rejects requests that mix flat prices with
# pricing_tiers. If the source uses pricing_tiers (e.g. token-
# tiered models like gpt-image-1), forward them and omit the
# flat prices; otherwise send only the flat prices.
create_kwargs: Dict[str, Any] = {
"model_name": m.model_name,
"match_pattern": m.match_pattern,
"start_date": m.start_date,
"unit": m.unit,
"tokenizer_id": m.tokenizer_id,
"tokenizer_config": m.tokenizer_config,
}
if getattr(m, "pricing_tiers", None):
create_kwargs["pricing_tiers"] = m.pricing_tiers
else:
if m.input_price is not None:
create_kwargs["input_price"] = m.input_price
if m.output_price is not None:
create_kwargs["output_price"] = m.output_price
if m.total_price is not None:
create_kwargs["total_price"] = m.total_price
with_retries(lambda: dst.api.models.create(**create_kwargs))
migrated += 1
except Exception as exc:
if getattr(exc, "status_code", None) == 409 or "already exists" in str(exc).lower():
skipped += 1
else:
print(f" failed to migrate model '{m.model_name}': {exc}")
failed += 1
if page >= getattr(resp.meta, "total_pages", page):
break
page += 1
print(f"Custom models — migrated: {migrated}, skipped: {skipped}, failed: {failed}")3. Prompts
Every version of every prompt is migrated. The destination assigns fresh version numbers; the original source version is recorded in commit_message so it stays traceable. Tags and labels are preserved as-is.
from urllib.parse import quote
from langfuse.api import (
CreateChatPromptRequest,
CreateChatPromptType,
CreateTextPromptRequest,
CreateTextPromptType,
Prompt_Chat,
Prompt_Text,
)
page = 1
prompt_versions = [] # (name, version) tuples
while True:
resp = with_retries(lambda: src.api.prompts.list(page=page, limit=100))
if not resp.data:
break
for meta in resp.data:
for v in meta.versions:
prompt_versions.append((meta.name, v))
if page >= getattr(resp.meta, "total_pages", page):
break
page += 1
prompt_versions.sort(key=lambda x: (x[0], x[1]))
print(f"Found {len(prompt_versions)} prompt versions on source")
migrated = failed = 0
for name, version in prompt_versions:
try:
# Folder prompts (names containing '/') must have their path segments
# URL-encoded on the GET endpoint — the Fern client does not do this
# automatically. The request body on create() keeps the raw name.
encoded_name = quote(name, safe="")
p = with_retries(lambda: src.api.prompts.get(encoded_name, version=version))
commit_msg = f"Migrated from source. Original version: {version}."
if isinstance(p, Prompt_Chat):
req = CreateChatPromptRequest(
name=p.name,
type=CreateChatPromptType.CHAT,
prompt=p.prompt,
labels=p.labels or [],
tags=p.tags or [],
config=p.config,
commit_message=commit_msg,
)
elif isinstance(p, Prompt_Text):
req = CreateTextPromptRequest(
name=p.name,
type=CreateTextPromptType.TEXT,
prompt=p.prompt,
labels=p.labels or [],
tags=p.tags or [],
config=p.config,
commit_message=commit_msg,
)
else:
print(f" unknown prompt type for {name} v{version}: {type(p).__name__}")
failed += 1
continue
with_retries(lambda: dst.api.prompts.create(request=req))
migrated += 1
except Exception as exc:
print(f" failed {name} v{version}: {exc}")
failed += 1
print(f"Prompts — migrated: {migrated}, failed: {failed}")4. Observations (via OTLP ingestion endpoint)
This is the core of the migration. We push each source trace (and its nested observations) into the destination's OTLP ingestion endpoint at /api/public/otel/v1/traces. The OTLP endpoint is the only way to set the authoritative start_time / end_time of an observation at ingest — the legacy REST ingestion rewrites those fields.
Self-hosted deployments: the v2 Observations API is currently Cloud-only. If you are migrating from a self-hosted Langfuse instance, replace the
observations.get_many(...)calls below withsrc.api.trace.get(trace.id).observations(which hits the v1 endpoint). A Langfuse self-hosted migration path for v2 is on the roadmap.
from opentelemetry import trace as otel_trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags
# --- Build a dedicated OTLP pipeline that points at the destination project ---
_auth = base64.b64encode(
f"{DEST_CFG['public_key']}:{DEST_CFG['secret_key']}".encode("utf-8")
).decode("ascii")
otlp_provider = TracerProvider(resource=Resource.create({"service.name": "langfuse-migration"}))
otlp_provider.add_span_processor(
SimpleSpanProcessor(
OTLPSpanExporter(
endpoint=f"{DEST_CFG['base_url']}/api/public/otel/v1/traces",
headers={
"Authorization": f"Basic {_auth}",
"x-langfuse-ingestion-version": "4",
},
)
)
)
migration_tracer = otlp_provider.get_tracer("langfuse-migration")
def _to_ns(d: Optional[dt.datetime]) -> Optional[int]:
if d is None:
return None
if d.tzinfo is None:
d = d.replace(tzinfo=dt.timezone.utc)
return int(d.timestamp() * 1_000_000_000)
def _set_scalar(span, key: str, value: Any) -> None:
"""Set an OTel attribute on `span` stringifying non-primitive values via UTF-8 JSON."""
if value is None:
return
if isinstance(value, (str, bool, int, float)):
span.set_attribute(key, value)
return
span.set_attribute(key, dumps(value))
def _as_attr_string(value: Any) -> Optional[str]:
"""Coerce an input/output/metadata value to a JSON attribute string.
The Observations API v2 returns input/output/metadata as raw JSON strings,
while the Trace listing returns them as parsed dicts/lists. Pass strings
through unchanged so we don't end up double-encoded; JSON-encode
everything else with ensure_ascii=False (Japanese-safe).
"""
if value is None:
return None
if isinstance(value, str):
return value
return dumps(value)
def _set_trace_attrs(span, trace_obj) -> None:
"""Set trace-level langfuse.* attributes on the root span."""
_set_scalar(span, "langfuse.trace.name", trace_obj.name)
_set_scalar(span, "langfuse.user.id", trace_obj.user_id)
_set_scalar(span, "langfuse.session.id", trace_obj.session_id)
_set_scalar(span, "langfuse.release", trace_obj.release)
_set_scalar(span, "langfuse.version", trace_obj.version)
_set_scalar(span, "langfuse.environment", trace_obj.environment or "default")
if trace_obj.public is not None:
span.set_attribute("langfuse.trace.public", bool(trace_obj.public))
if trace_obj.tags:
# Langfuse accepts both a string[] attribute and a JSON string.
span.set_attribute("langfuse.trace.tags", list(trace_obj.tags))
input_str = _as_attr_string(trace_obj.input)
if input_str is not None:
span.set_attribute("langfuse.trace.input", input_str)
output_str = _as_attr_string(trace_obj.output)
if output_str is not None:
span.set_attribute("langfuse.trace.output", output_str)
md = trace_obj.metadata
if isinstance(md, dict):
for k, v in md.items():
_set_scalar(span, f"langfuse.trace.metadata.{k}", v)
elif md is not None:
_set_scalar(span, "langfuse.trace.metadata._raw", md)
_OBS_TYPE_MAP = {
"SPAN": "span",
"GENERATION": "generation",
"EVENT": "event",
"AGENT": "span",
"TOOL": "span",
"CHAIN": "span",
"RETRIEVER": "span",
"EVALUATOR": "span",
"EMBEDDING": "generation",
"GUARDRAIL": "span",
}
def _set_observation_attrs(span, obs) -> None:
obs_type = _OBS_TYPE_MAP.get(obs.type, "span")
span.set_attribute("langfuse.observation.type", obs_type)
_set_scalar(span, "langfuse.observation.level", obs.level)
_set_scalar(span, "langfuse.observation.status_message", obs.status_message)
_set_scalar(span, "langfuse.version", obs.version)
_set_scalar(span, "langfuse.environment", obs.environment or "default")
input_str = _as_attr_string(obs.input)
if input_str is not None:
span.set_attribute("langfuse.observation.input", input_str)
output_str = _as_attr_string(obs.output)
if output_str is not None:
span.set_attribute("langfuse.observation.output", output_str)
md = obs.metadata
if isinstance(md, dict):
for k, v in md.items():
_set_scalar(span, f"langfuse.observation.metadata.{k}", v)
elif md is not None:
_set_scalar(span, "langfuse.observation.metadata._raw", md)
# Generation-only attributes. ObservationV2 renames `model` to
# `provided_model_name`; fall back to `model` for the v1 type used by
# self-hosted migrations.
if obs_type == "generation":
model_name = getattr(obs, "provided_model_name", None) or getattr(obs, "model", None)
_set_scalar(span, "langfuse.observation.model.name", model_name)
if obs.model_parameters:
span.set_attribute("langfuse.observation.model.parameters", dumps(obs.model_parameters))
if obs.usage_details:
span.set_attribute("langfuse.observation.usage_details", dumps(obs.usage_details))
if obs.cost_details:
span.set_attribute("langfuse.observation.cost_details", dumps(obs.cost_details))
if obs.completion_start_time is not None:
d = obs.completion_start_time
if d.tzinfo is None:
d = d.replace(tzinfo=dt.timezone.utc)
span.set_attribute(
"langfuse.observation.completion_start_time",
d.isoformat().replace("+00:00", "Z"),
)
if getattr(obs, "prompt_name", None):
_set_scalar(span, "langfuse.observation.prompt.name", obs.prompt_name)
if getattr(obs, "prompt_version", None) is not None:
span.set_attribute("langfuse.observation.prompt.version", int(obs.prompt_version))
def _parent_ctx(trace_id_hex: str, parent_span_id_hex: Optional[str]):
if not parent_span_id_hex:
return None
parent_sc = SpanContext(
trace_id=int(trace_id_hex, 16),
span_id=int(parent_span_id_hex, 16),
is_remote=True,
trace_flags=TraceFlags(0x01),
)
return otel_trace.set_span_in_context(NonRecordingSpan(parent_sc))
trace_id_map: Dict[str, str] = {}
obs_id_map: Dict[str, str] = {}
# Field groups we need from the v2 Observations API. `core` is always included.
# `basic` gives name/level/status/version/environment; `time` adds
# completionStartTime; `io` adds input/output; `metadata` adds metadata;
# `model` adds providedModelName/modelParameters; `usage` adds
# usageDetails/costDetails; `prompt` adds promptName/promptVersion.
OBSERVATIONS_FIELDS = "core,basic,time,io,metadata,model,usage,prompt"
def _fetch_observations_v2(source_trace_id: str) -> List[Any]:
"""Fetch all observations for a trace via the v2 API with cursor pagination.
Cloud-only. For self-hosted migrations, replace this helper with:
return with_retries(lambda: src.api.trace.get(source_trace_id)).observations
"""
results: List[Any] = []
cursor: Optional[str] = None
while True:
resp = with_retries(lambda: src.api.observations.get_many(
trace_id=source_trace_id,
fields=OBSERVATIONS_FIELDS,
limit=1000,
cursor=cursor,
))
if resp.data:
results.extend(resp.data)
cursor = getattr(resp.meta, "cursor", None)
if not cursor:
break
return results
def migrate_one_trace(trace_obj) -> int:
"""Migrate one trace + its observations. Returns the number of child observations migrated."""
observations = _fetch_observations_v2(trace_obj.id)
new_trace_hex = to_otel_trace_id(trace_obj.id)
trace_id_map[trace_obj.id] = new_trace_hex
# Sort observations: parents before children, earliest first.
observations.sort(key=lambda o: (o.start_time, o.id))
# Root span carrying trace-level attributes. If the source has no explicit
# root observation we synthesize one with the trace's own timestamps.
root_span_hex = to_otel_span_id(trace_obj.id) # deterministic root span id
trace_start = min((o.start_time for o in observations), default=trace_obj.timestamp)
trace_end = max(
(o.end_time or o.start_time for o in observations), default=trace_obj.timestamp
)
# --- Root span ---
root_parent_sc = SpanContext(
trace_id=int(new_trace_hex, 16),
span_id=int(root_span_hex, 16),
is_remote=True,
trace_flags=TraceFlags(0x01),
)
# Root span is created with its own SpanContext as parent context so the
# trace_id is forced. We then create a child span underneath it that
# *acts* as the root observation. This keeps the code uniform for nesting.
root_ctx = otel_trace.set_span_in_context(NonRecordingSpan(root_parent_sc))
root_span = migration_tracer.start_span(
name=trace_obj.name or "trace",
start_time=_to_ns(trace_start),
context=root_ctx,
)
try:
_set_trace_attrs(root_span, trace_obj)
root_span.set_attribute("langfuse.observation.type", "span")
# Reserve the root span's span_id so children under source-root
# observations (parent_observation_id is None) nest beneath it.
actual_root_span_id_hex = format(root_span.get_span_context().span_id, "016x")
for obs in observations:
new_span_hex = to_otel_span_id(obs.id)
obs_id_map[obs.id] = new_span_hex
parent_hex = (
obs_id_map.get(obs.parent_observation_id)
if obs.parent_observation_id
else actual_root_span_id_hex
)
child_ctx = _parent_ctx(new_trace_hex, parent_hex)
child = migration_tracer.start_span(
name=obs.name or obs.type.lower(),
start_time=_to_ns(obs.start_time),
context=child_ctx,
)
try:
_set_observation_attrs(child, obs)
finally:
child.end(end_time=_to_ns(obs.end_time) or _to_ns(obs.start_time))
finally:
root_span.end(end_time=_to_ns(trace_end))
return len(observations)
# --- Drive the migration ---
page = 1
limit = 50
total = 0
total_obs = 0
failures = 0
progress_every = 10 # print a progress line every N traces
t_start = time.time()
while True:
# `fields="core,io"` on trace.list ensures we get input/output/metadata
# in the listing response and don't need a second call per trace.
listing = with_retries(lambda: src.api.trace.list(
page=page,
limit=limit,
order_by="timestamp.asc",
from_timestamp=FROM_TS,
to_timestamp=TO_TS,
fields="core,io",
))
if not listing.data:
break
total_items = getattr(listing.meta, "total_items", None)
total_pages = getattr(listing.meta, "total_pages", None)
if page == 1 and total_items is not None:
print(f"Starting migration: {total_items} trace(s) across {total_pages} page(s)")
for t in listing.data:
try:
obs_count = migrate_one_trace(t)
total += 1
total_obs += obs_count
if total % progress_every == 0:
elapsed = time.time() - t_start
rate = total / elapsed if elapsed > 0 else 0
progress = (
f"{total}/{total_items}" if total_items is not None else str(total)
)
print(
f" [{progress} traces] observations={total_obs} "
f"failed={failures} rate={rate:.1f} traces/s"
)
except Exception as exc:
print(f" failed trace {t.id}: {exc}")
failures += 1
print(
f" page {page}/{total_pages or '?'} done — "
f"cumulative: traces={total}, observations={total_obs}, failed={failures}"
)
if total_pages is not None and page >= total_pages:
break
page += 1
otlp_provider.force_flush()
elapsed = time.time() - t_start
print(
f"\nObservations — migrated: {total} traces / {total_obs} observations, "
f"failed: {failures}, elapsed: {elapsed:.1f}s"
)
print(f" trace_id_map entries: {len(trace_id_map)}")
print(f" obs_id_map entries: {len(obs_id_map)}")5. Scores
Scores are fetched from the source via api.scores.get_many and re-created on the destination via the native create_score helper. Original id is preserved so re-runs are idempotent. config_id is remapped through config_id_map from Section 1; trace_id / observation_id are remapped through trace_id_map / obs_id_map from Section 4.
trace_id_map = globals().get("trace_id_map", {})
obs_id_map = globals().get("obs_id_map", {})
config_id_map = globals().get("config_id_map", {})
page = 1
migrated = failed = skipped = 0
while True:
resp = with_retries(lambda: src.api.scores.get_many(
page=page,
limit=100,
from_timestamp=FROM_TS,
to_timestamp=TO_TS,
))
if not resp.data:
break
for s in resp.data:
# Remap references
new_trace_id = trace_id_map.get(s.trace_id, s.trace_id) if s.trace_id else None
new_obs_id = obs_id_map.get(s.observation_id, s.observation_id) if s.observation_id else None
new_cfg_id = config_id_map.get(s.config_id, s.config_id) if s.config_id else None
# Determine typed value
if s.data_type == "CATEGORICAL":
value = getattr(s, "string_value", None) or str(s.value)
else:
value = s.value
if value is None:
skipped += 1
continue
try:
with_retries(lambda: dst.create_score(
name=s.name,
value=value,
data_type=s.data_type,
trace_id=new_trace_id,
observation_id=new_obs_id,
session_id=getattr(s, "session_id", None),
dataset_run_id=getattr(s, "dataset_run_id", None),
score_id=s.id, # preserves ID for idempotent re-runs
comment=s.comment,
config_id=new_cfg_id,
metadata=s.metadata,
timestamp=s.timestamp,
))
migrated += 1
except Exception as exc:
print(f" failed score {s.id} ({s.name}): {exc}")
failed += 1
if page >= getattr(resp.meta, "total_pages", page):
break
page += 1
# Flush scores (create_score queues events on the dst client)
dst.flush()
print(f"Scores — migrated: {migrated}, skipped (null value): {skipped}, failed: {failed}")6. Datasets (items + run items)
We migrate datasets themselves via the native create_dataset helper, items via create_dataset_item (preserving source item IDs), and run items via the REST endpoint. Dataset run items reference migrated traces/observations using trace_id_map / obs_id_map.
# Fallback to empty maps when this cell is run standalone (see Scores section).
trace_id_map = globals().get("trace_id_map", {})
obs_id_map = globals().get("obs_id_map", {})
from urllib.parse import quote as _urlquote
def _enc(name: str) -> str:
return _urlquote(name, safe="")
ds_migrated = ds_skipped = ds_failed = 0
items_migrated = items_failed = 0
runs_created = runs_failed = 0
t_start = time.time()
ds_seen = 0 # count of source datasets processed (new + existing)
page_ds = 1
while True:
dsresp = with_retries(lambda: src.api.datasets.list(page=page_ds, limit=100))
if not dsresp.data:
break
total_ds = getattr(dsresp.meta, "total_items", None)
total_pages = getattr(dsresp.meta, "total_pages", None)
if page_ds == 1 and total_ds is not None:
print(f"Starting dataset migration: {total_ds} dataset(s) across {total_pages} page(s)")
for ds in dsresp.data:
ds_seen += 1
progress = f"{ds_seen}/{total_ds}" if total_ds is not None else str(ds_seen)
print(f" [{progress}] '{ds.name}'")
# Create dataset if not present. Path parameters must be URL-encoded
# for folder datasets (names containing "/").
try:
dst.api.datasets.get(dataset_name=_enc(ds.name))
ds_skipped += 1
print(f" dataset already exists in destination — skipping create")
except Exception as exc:
if getattr(exc, "status_code", None) != 404:
print(f" failed dataset lookup: {exc}")
ds_failed += 1
continue
try:
with_retries(lambda: dst.create_dataset(
name=ds.name, description=ds.description, metadata=ds.metadata
))
ds_migrated += 1
print(f" created dataset in destination")
except Exception as exc2:
print(f" failed to create dataset: {exc2}")
ds_failed += 1
continue
# Items (preserve source IDs so run items link correctly)
ds_items_before = items_migrated
ds_items_failed_before = items_failed
page_item = 1
while True:
iresp = with_retries(lambda: src.api.dataset_items.list(
dataset_name=ds.name, page=page_item, limit=100
))
if not iresp.data:
break
for it in iresp.data:
try:
with_retries(lambda: dst.create_dataset_item(
dataset_name=ds.name,
id=it.id,
input=it.input,
expected_output=it.expected_output,
metadata=it.metadata,
source_trace_id=trace_id_map.get(it.source_trace_id, it.source_trace_id) if it.source_trace_id else None,
source_observation_id=obs_id_map.get(it.source_observation_id, it.source_observation_id) if it.source_observation_id else None,
status=it.status,
))
items_migrated += 1
except Exception as exc:
print(f" failed item {it.id}: {exc}")
items_failed += 1
if page_item >= getattr(iresp.meta, "total_pages", page_item):
break
page_item += 1
ds_items = items_migrated - ds_items_before
ds_items_fail = items_failed - ds_items_failed_before
print(f" items: {ds_items} migrated, {ds_items_fail} failed")
# Run items. Path params (dataset_name, run_name) need URL-encoding.
ds_runs_before = runs_created
ds_runs_failed_before = runs_failed
run_count = 0
page_run = 1
while True:
rresp = with_retries(lambda: src.api.datasets.get_runs(
dataset_name=_enc(ds.name), page=page_run, limit=100
))
if not rresp.data:
break
for run_meta in rresp.data:
run_count += 1
try:
run_full = with_retries(lambda: src.api.datasets.get_run(
dataset_name=_enc(ds.name), run_name=_enc(run_meta.name)
))
except Exception as exc:
print(f" failed run fetch '{run_meta.name}': {exc}")
runs_failed += 1
continue
for ri in run_full.dataset_run_items:
new_trace = trace_id_map.get(ri.trace_id, ri.trace_id) if ri.trace_id else None
new_obs = obs_id_map.get(ri.observation_id, ri.observation_id) if ri.observation_id else None
if not new_trace and not new_obs:
runs_failed += 1
continue
try:
with_retries(lambda: dst.api.dataset_run_items.create(
run_name=run_meta.name,
dataset_item_id=ri.dataset_item_id,
trace_id=new_trace,
observation_id=new_obs,
run_description=run_meta.description,
metadata=run_meta.metadata,
))
runs_created += 1
except Exception as exc:
print(f" failed run item for run '{run_meta.name}': {exc}")
runs_failed += 1
if page_run >= getattr(rresp.meta, "total_pages", page_run):
break
page_run += 1
ds_runs = runs_created - ds_runs_before
ds_runs_fail = runs_failed - ds_runs_failed_before
print(
f" runs: {run_count} run(s), {ds_runs} run item(s) linked, "
f"{ds_runs_fail} failed"
)
if page_ds >= getattr(dsresp.meta, "total_pages", page_ds):
break
page_ds += 1
elapsed = time.time() - t_start
print(
f"\nDatasets — new: {ds_migrated}, existing: {ds_skipped}, failed: {ds_failed} | "
f"items: {items_migrated} migrated / {items_failed} failed | "
f"run items: {runs_created} linked / {runs_failed} failed | "
f"elapsed: {elapsed:.1f}s"
)