Tracing
Dreadnode tracing is OpenTelemetry-based and implemented via logfire. The SDK exposes
TaskSpan context managers for structured traces plus lightweight Span objects when you only
need timing and tags.
Quick start
Section titled “Quick start”import dreadnode as dn
dn.configure( server="https://api.dreadnode.io", api_key="dn_...", organization="acme", project="support-bots", trace_backend="remote",)
with dn.task_span("data-prep", type="task", tags=["etl"]) as span: span.log_input("rows", 250) span.log_metric("latency_ms", 112, step=1) span.log_output("status", "ok")TaskSpan vs Span
Section titled “TaskSpan vs Span”TaskSpan captures inputs, outputs, metrics, params, artifacts, and child tasks. Use Span
for lightweight timing and tagging when you do not need structured inputs/outputs.
import dreadnode as dnfrom dreadnode.tracing.span import Span, TaskSpan
with dn.span("cache-warmup") as span: assert isinstance(span, Span)
with dn.task_span("model-inference", type="task") as task_span: assert isinstance(task_span, TaskSpan)SpanType values
Section titled “SpanType values”SpanType is a literal type that describes what kind of work a span represents:
"task", "evaluation", "agent", "study", "tool", "trial", "sample",
"generation", "scorer", and "span".
from dreadnode.tracing.constants import SpanTypeimport dreadnode as dn
span_type: SpanType = "evaluation"
with dn.task_span("eval:baseline", type=span_type): passRuns and task decorators
Section titled “Runs and task decorators”dn.run()
Section titled “dn.run()”Create a top-level task span with trace infrastructure setup. This is the recommended entry point for standalone scripts.
import dreadnode as dn
with dn.run("experiment-1", project="my-project", tags=["baseline"]) as run: run.log_param("model", "gpt-4o") run.log_input("prompt", "Hello") # ... do work ... run.log_output("result", "World") run.log_metric("score", 0.95)dn.task_and_run()
Section titled “dn.task_and_run()”Context manager that creates a task span and auto-initializes trace infrastructure if no parent context exists. Useful when you want both a run and a task in one call.
import dreadnode as dn
with dn.task_and_run("eval-run", task_name="scoring", task_type="evaluation") as task: dn.log_input("dataset", "test-v2") dn.log_metric("accuracy", 0.92)@dn.task decorator
Section titled “@dn.task decorator”Wraps a function so that every call is traced as a task span. Optionally attach scorers that run automatically.
import dreadnode as dn
@dn.taskasync def classify(text: str) -> str: # function body is automatically traced return "positive"
# With options@dn.task(name="classifier", scorers=[my_scorer], log_inputs=True, log_output=True)async def classify_v2(text: str) -> str: return "positive"Logging
Section titled “Logging”Parameters
Section titled “Parameters”import dreadnode as dn
dn.log_param("model", "gpt-4o")dn.log_params(model="gpt-4o", temperature=0.7, max_tokens=1000)Inputs and outputs
Section titled “Inputs and outputs”import dreadnode as dn
# Singledn.log_input("prompt", "Write a haiku about security")dn.log_output("completion", "Firewalls stand guard...")
# Batchdn.log_inputs(prompt="Write a haiku", context="security domain")dn.log_outputs(completion="Firewalls stand guard...", tokens_used=42)Metrics
Section titled “Metrics”import dreadnode as dn
# Single metricdn.log_metric("score", 0.91, step=0)
# With aggregation modedn.log_metric("latency_ms", 112, aggregation="avg")
# Batch metricsdn.log_metrics({"accuracy": 0.95, "f1": 0.88}, step=1)Aggregation modes: "direct" (default), "min", "max", "avg", "sum", "count".
Samples
Section titled “Samples”Log input/output pairs as ephemeral task spans. Each sample automatically links its output to its input.
import dreadnode as dn
# Single sampledn.log_sample( "sample-1", input="input text", output="output text", metrics={"accuracy": 0.95},)
# Batch samplesdn.log_samples( "eval-set", [ ("input 1", "output 1"), ("input 2", "output 2", {"score": 0.8}), # with metrics ],)import dreadnode as dn
dn.tag("baseline", "v2")Object linking
Section titled “Object linking”Associate two runtime objects together (e.g. link a model output to its input).
import dreadnode as dn
dn.link_objects(output_obj, input_obj, attributes={"relation": "generated_from"})Flushing
Section titled “Flushing”Force-flush pending span data to the backend immediately.
import dreadnode as dn
dn.push_update()Logging artifacts
Section titled “Logging artifacts”Artifacts require storage to be configured (via configure). The SDK uploads files to
workspace CAS and stores metadata on the span. Directories are recursively uploaded.
import dreadnode as dn
dn.configure( server="https://api.dreadnode.io", api_key="dn_...", organization="acme", project="support-bots",)
with dn.task_span("training"): dn.log_artifact("./checkpoints/model.bin", name="model.bin") dn.log_artifact("./results/") # uploads all files in the directoryAccessing current spans
Section titled “Accessing current spans”Retrieve the active span from anywhere in your code.
import dreadnode as dn
run = dn.get_current_run() # current top-level run span, or Nonetask = dn.get_current_task() # current task span, or NoneTaskSpan properties
Section titled “TaskSpan properties”Key properties available on TaskSpan instances:
| Property | Type | Description |
|---|---|---|
task_id | str | Unique task identifier. |
root_id | str | Root span ID for the trace. |
inputs | dict | Logged inputs. |
outputs | dict | Logged outputs. |
metrics | dict | Logged metrics. |
params | dict | Logged parameters. |
duration | float | None | Span duration in seconds. |
active | bool | Whether the span is currently active. |
failed | bool | Whether the span recorded a failure. |
tasks | list | Direct child task spans. |
all_tasks | list | All descendant task spans. |
Distributed tracing
Section titled “Distributed tracing”Serialize a task context for cross-process or cross-host continuation using W3C TraceContext propagation.
import dreadnode as dn
# On the originating processinstance = dn.get_default_instance()context = instance.get_task_context()# context is a TypedDict: {task_id, task_name, project, trace_context}
# Send context to the remote process (e.g. via message queue)...
# On the remote processinstance = dn.get_default_instance()with instance.continue_task(context) as task: task.log_metric("remote_score", 0.88)Binding session IDs
Section titled “Binding session IDs”Use bind_session_id to route spans to a session-scoped JSONL file.
import dreadnode as dnfrom dreadnode.tracing.span import bind_session_id
with bind_session_id("session_123"): with dn.task_span("chat:turn-1"): dn.log_input("prompt", "Hi") dn.log_output("response", "Hello!")TraceBackend configuration
Section titled “TraceBackend configuration”TraceBackend controls whether the SDK streams spans remotely in addition to local JSONL.
| Setting | Local JSONL | Remote OTLP |
|---|---|---|
"local" | Always written | Disabled |
"remote" | Always written | Enabled |
None (default) | Always written | Auto-enabled if server + api_key + organization are all provided |
from dreadnode.tracing.exporters import TraceBackendimport dreadnode as dn
backend: TraceBackend = "remote"
dn.configure( server="https://api.dreadnode.io", api_key="dn_...", organization="acme", project="support-bots", trace_backend=backend,)Remote spans are sent via OTLP to {server}/api/v1/org/{org}/otel/traces with gzip
compression and API key authentication.
Local trace files
Section titled “Local trace files”Local JSONL files are always written regardless of trace_backend setting.
- Session-bound spans:
~/.dreadnode/sessions/<session_id>/chat_<session_id>.jsonl - Run-scoped spans:
~/.dreadnode/projects/<project>/<run_id>/spans.jsonl