Observability¶
weevr provides structured telemetry at every level of execution. This guide covers logging, execution spans, trace hierarchies, and how to route telemetry data to external systems.
Structured JSON logging¶
All log output from weevr is emitted as structured JSON through Python's
standard logging module. The StructuredJsonFormatter formats each log
record as a single-line JSON object with OTel-compatible field names:
{
"timestamp": "2025-06-15T14:30:22.451000+00:00",
"level": "INFO",
"logger": "weevr.engine.executor",
"message": "Thread 'stg_customers' completed: 1204 rows written",
"thread_name": "stg_customers",
"weave_name": "staging",
"trace_id": "a1b2c3d4e5f67890a1b2c3d4e5f67890",
"span_id": "f1e2d3c4b5a67890"
}
Each entry includes:
- timestamp -- ISO 8601 UTC timestamp
- level -- Python log level (
DEBUG,INFO,WARNING,ERROR) - logger -- Dotted module path that emitted the record
- message -- Human-readable description
- thread_name, weave_name, loom_name -- Execution context (included when available)
- trace_id, span_id -- Correlation IDs linking the log entry to its execution span
- attributes -- Optional key-value map with additional context
Because the output is JSON, it integrates directly with log aggregation systems (Azure Monitor, Elasticsearch, Splunk) without custom parsing.
Log levels¶
weevr defines four log levels that map to Python's built-in levels:
| weevr level | Python level | What it includes |
|---|---|---|
minimal |
WARNING |
Warnings and errors only |
standard |
INFO |
Execution milestones, row counts, timing |
verbose |
DEBUG |
Step-level detail, cache events, DAG decisions |
debug |
DEBUG |
Full diagnostic output including config dumps |
The default is standard, which provides enough information to confirm
execution success and diagnose common issues without flooding the output.
Configuring log level¶
Set the log level when constructing a Context:
from weevr import Context
ctx = Context(spark, "my-project.weevr", log_level="verbose")
result = ctx.run("nightly.loom")
You can also set it in YAML execution settings on a loom or weave, and it cascades through configuration inheritance:
The Context constructor value takes precedence over YAML settings.
Execution spans¶
weevr uses an OTel-compatible span model to capture timing, status, and metrics for every unit of work. Spans form a hierarchy that mirrors the config structure.
Span model¶
Each span is an ExecutionSpan with the following fields:
| Field | Description |
|---|---|
trace_id |
32-character hex string identifying the execution trace |
span_id |
16-character hex string identifying this span |
parent_span_id |
Parent span's ID, or None for the root span |
name |
Human-readable label (e.g., "thread:stg_customers") |
status |
OK, ERROR, or UNSET |
start_time |
UTC timestamp when the span started |
end_time |
UTC timestamp when the span ended |
attributes |
Key-value metrics (row counts, durations, etc.) |
events |
Timestamped events within the span |
Trace IDs and span IDs follow OTel conventions (hex-encoded random bytes), making them compatible with tracing backends that accept OTel data.
Span events¶
Events mark significant moments within a span. Each SpanEvent has a
name, timestamp, and optional attributes dict. Common events include:
sources_read-- All source DataFrames loadedpipeline_complete-- Transform steps finishedvalidation_complete-- Pre-write validation donewrite_complete-- Target write finishedwatermark_persisted-- High-water mark saved
Span hierarchy¶
A loom execution produces a three-level span tree:
The parent_span_id on each span links it to its parent, forming a
navigable tree. The trace_id is shared across all spans in a single
execution.
Building spans¶
weevr uses SpanCollector and SpanBuilder to construct spans during
execution:
- A
SpanCollectoris created for the execution scope and holds the sharedtrace_id. collector.start_span("thread:dim_customer", parent_span_id=...)returns a mutableSpanBuilder.- The builder accumulates attributes and events as the work progresses.
builder.finish(SpanStatus.OK)produces an immutableExecutionSpanthat is added to the collector.- Thread-level collectors merge into the weave-level collector after thread completion.
This design avoids contention during concurrent thread execution -- each thread operates on its own builder, and merging happens only after the thread finishes.
Trace hierarchy and serialization¶
The telemetry.trace module provides tree-shaped trace models that wrap
spans with their child traces:
LoomTrace-- Root of the tree, containingWeaveTraceentriesWeaveTrace-- ContainsThreadTraceentriesThreadTrace-- Leaf node wrapping a single thread span and its telemetry data
Each trace type exposes a to_spans() method that recursively flattens
the tree into a list of ExecutionSpan objects, suitable for export to
any OTel-compatible backend:
result = ctx.run("nightly.loom")
# Navigate the tree
for weave_name, weave_trace in result.detail.trace.weaves.items():
for thread_name, thread_trace in weave_trace.threads.items():
span = thread_trace.span
print(f"{thread_name}: {span.status} in {span.duration_ms}ms")
# Or flatten for export
all_spans = result.detail.trace.to_spans()
Telemetry results¶
Every ctx.run() call in execute mode returns a RunResult with a
telemetry attribute containing structured telemetry data. The type
depends on what was executed:
| Config type | Telemetry type | Contents |
|---|---|---|
| Thread | ThreadTelemetry |
Span, validation/assertion results, row counts, watermark state |
| Weave | WeaveTelemetry |
Span, per-thread telemetry keyed by thread name |
| Loom | LoomTelemetry |
Span, per-weave telemetry keyed by weave name |
Thread telemetry fields¶
ThreadTelemetry is the most detailed telemetry object. It includes:
- span -- The thread's execution span
- rows_read, rows_written, rows_quarantined -- Row counts
- validation_results -- Per-rule pass/fail counts and severity
- assertion_results -- Post-write assertion outcomes
- load_mode -- The load mode used (full, incremental, CDC)
- watermark_column, watermark_previous_value, watermark_new_value -- Watermark state for incremental loads
- cdc_inserts, cdc_updates, cdc_deletes -- CDC operation counts
Accessing telemetry¶
result = ctx.run("nightly.loom")
telemetry = result.telemetry
for weave_name, weave_telem in telemetry.weave_telemetry.items():
print(f"Weave: {weave_name}")
print(f" Duration: {weave_telem.span.end_time - weave_telem.span.start_time}")
for thread_name, t in weave_telem.thread_telemetry.items():
print(f" Thread: {thread_name}")
print(f" Read: {t.rows_read}")
print(f" Written: {t.rows_written}")
print(f" Quarantined: {t.rows_quarantined}")
Progress reporting¶
During execution, weevr emits structured log events at key milestones. The events you can expect in a typical loom execution include:
| Phase | Events emitted |
|---|---|
| Config loading | Config parsed, variables resolved, references loaded |
| DAG planning | Dependency graph built, execution order determined |
| Weave start | Weave execution started, thread count logged |
| Thread start | Sources reading, pipeline steps executing |
| Validation | Per-rule pass/fail counts, quarantine row counts |
| Write | Write mode, target path, rows written |
| Watermark | Previous value, new value, persistence status |
| Cache | Persist/unpersist events for cached DataFrames |
| Thread complete | Final row counts, duration, status |
| Weave complete | Aggregate status, thread success/failure counts |
| Loom complete | Overall status, total duration, summary |
At standard log level, you see the start/complete events with row counts
and timing. At verbose, you see every step-level event. At minimal, you
see only warnings and errors.
Custom logging handlers¶
weevr logs through a standard Python logger named "weevr". To route log
output to an external system, attach a custom handler:
import logging
weevr_logger = logging.getLogger("weevr")
# Example: send logs to a file
file_handler = logging.FileHandler("/tmp/weevr-execution.log")
weevr_logger.addHandler(file_handler)
# Example: send logs to Azure Monitor, Datadog, etc.
# weevr_logger.addHandler(your_custom_handler)
Because weevr uses StructuredJsonFormatter, each record is already a JSON
string. Custom handlers can parse the JSON for routing, filtering, or
enrichment.
Custom telemetry sinks¶
For more advanced telemetry export -- writing execution metrics to a Delta
table, pushing spans to a tracing backend, or feeding a monitoring
dashboard -- see the
Implement a Custom Telemetry Sink
how-to guide. That guide walks through extracting telemetry from a
RunResult, flattening the hierarchy into rows, and writing to Delta.
Debugging a failing pipeline¶
When a thread fails, use this progression:
- Check
result.statusandresult.summary()-- Quick overview of what failed. - Set
log_level="verbose"-- Rerun with verbose logging to see step-by-step execution detail. - Inspect telemetry spans -- Look at the failing thread's span for
status, attributes, and events. The span's
attributesdict often contains the error message and the step where failure occurred. - Check validation results -- If rows were quarantined, review the
validation_resultson the thread's telemetry to see which rules failed and how many rows were affected. - Use preview mode -- Run with
mode="preview"to execute transforms against sampled data without writing, isolating whether the issue is in the transform logic or the write step.
ctx = Context(spark, "my-project.weevr", log_level="verbose")
result = ctx.run("nightly.loom")
if result.status != "success":
print(result.summary())
# Drill into the failing thread's telemetry
for weave_name, wt in result.telemetry.weave_telemetry.items():
for thread_name, tt in wt.thread_telemetry.items():
if tt.span.status.value == "ERROR":
print(f"Failed: {thread_name}")
print(f" Events: {tt.span.events}")
Next steps¶
- Implement Custom Telemetry Sink -- Export telemetry to Delta or external systems
- Execution Modes -- Validate and preview without executing
- Fabric Runtime -- Environment setup and configuration