Telemetry API¶
The weevr.telemetry module provides structured logging, execution span
tracking, and progress reporting. Telemetry settings cascade through the
loom/weave/thread hierarchy via ExecutionConfig.
weevr.telemetry
¶
weevr telemetry — structured observability for execution pipelines.
__all__ = ['ExecutionSpan', 'SpanEvent', 'SpanStatus', 'generate_trace_id', 'generate_span_id', 'SpanCollector', 'SpanBuilder', 'LogEvent', 'create_log_event', 'LogLevel', 'StructuredJsonFormatter', 'configure_logging', 'ValidationResult', 'AssertionResult', 'ThreadTelemetry', 'WeaveTelemetry', 'LoomTelemetry', 'ThreadTrace', 'WeaveTrace', 'LoomTrace']
module-attribute
¶
SpanBuilder
¶
Mutable builder for constructing an ExecutionSpan incrementally.
Created by :meth:SpanCollector.start_span. Accumulate attributes and
events during execution, then call :meth:finish to produce an immutable
:class:ExecutionSpan.
span_id
property
¶
The pre-assigned span ID for this builder.
trace_id
property
¶
The trace ID this span belongs to.
__init__(trace_id, span_id, name, parent_span_id)
¶
Initialize a span builder with identity and parent linkage.
set_attribute(key, value)
¶
Set a key-value attribute on the span.
add_event(name, attributes=None)
¶
Add a timestamped event to the span.
finish(status=SpanStatus.OK)
¶
Finalize the span and return an immutable ExecutionSpan.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
status
|
SpanStatus
|
Final status of the span. |
OK
|
Returns:
| Type | Description |
|---|---|
ExecutionSpan
|
Immutable ExecutionSpan with all accumulated attributes and events. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If finish() has already been called on this builder. |
SpanCollector
¶
Mutable collector that accumulates finished spans during execution.
Each execution scope (thread, weave, loom) gets its own collector. Thread-level collectors are merged into the weave-level collector after thread completion, ensuring no contention during concurrent execution.
trace_id
property
¶
The trace ID this collector is bound to.
__init__(trace_id)
¶
Initialize a collector bound to a trace ID.
start_span(name, parent_span_id=None)
¶
Start building a new span.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Human-readable span name (e.g., "thread:customer_dim"). |
required |
parent_span_id
|
str | None
|
Optional parent span ID for hierarchy. |
None
|
Returns:
| Type | Description |
|---|---|
SpanBuilder
|
A mutable SpanBuilder to accumulate attributes and events. |
add_span(span)
¶
Add a finished span to the collector.
merge(other)
¶
Merge spans from another collector into this one.
Used to combine thread-level collectors into the weave-level collector after thread completion.
get_spans()
¶
Return all collected spans.
LogEvent
¶
Bases: FrozenBase
Frozen Pydantic model for a structured log event.
Each execution step can produce one or more LogEvents. These are
serialized to JSON via :class:StructuredJsonFormatter and emitted
through Python's logging module.
Attributes:
| Name | Type | Description |
|---|---|---|
timestamp |
datetime
|
UTC timestamp of the event. |
level |
str
|
Log level string (INFO, WARNING, ERROR, DEBUG). |
component |
str
|
Dotted module path (e.g., "engine.executor"). |
thread_name |
str | None
|
Thread being executed, if applicable. |
weave_name |
str | None
|
Weave being executed, if applicable. |
loom_name |
str | None
|
Loom being executed, if applicable. |
trace_id |
str | None
|
Trace ID correlating this event to an execution trace. |
span_id |
str | None
|
Span ID of the active span when this event was emitted. |
message |
str
|
Human-readable log message. |
attributes |
dict[str, str | int | float | bool]
|
Additional key-value context. |
LogLevel
¶
Bases: StrEnum
Configurable log level for weevr execution.
Controls the verbosity of structured logging output during pipeline execution. Maps to Python logging levels internally.
StructuredJsonFormatter
¶
Bases: Formatter
Formats log records as structured JSON with OTel-compatible field names.
Produces one JSON object per log line with fields: timestamp, level, logger, message, and any extra attributes attached to the log record.
format(record)
¶
Format a log record as a JSON string.
AssertionResult
¶
Bases: FrozenBase
Outcome of a single post-execution assertion.
Attributes:
| Name | Type | Description |
|---|---|---|
assertion_type |
str
|
Type of assertion (row_count, column_not_null, unique, expression). |
severity |
Severity
|
Configured severity level. |
passed |
bool
|
Whether the assertion passed. |
details |
str
|
Human-readable outcome description. |
columns |
list[str] | None
|
Columns involved, if applicable. |
LoomTelemetry
¶
Bases: FrozenBase
Telemetry data composed into a LoomResult.
Attributes:
| Name | Type | Description |
|---|---|---|
span |
ExecutionSpan
|
The execution span for this loom. |
weave_telemetry |
dict[str, WeaveTelemetry]
|
Per-weave telemetry keyed by weave name. |
ThreadTelemetry
¶
Bases: FrozenBase
Telemetry data composed into a ThreadResult.
Attributes:
| Name | Type | Description |
|---|---|---|
span |
ExecutionSpan
|
The execution span for this thread. |
validation_results |
list[ValidationResult]
|
Results of pre-write validation rules. |
assertion_results |
list[AssertionResult]
|
Results of post-write assertions. |
rows_read |
int
|
Total rows read from sources. |
rows_written |
int
|
Rows written to the target. |
rows_quarantined |
int
|
Rows written to the quarantine table. |
load_mode |
str | None
|
Load mode used (full, incremental_watermark, etc.). |
watermark_column |
str | None
|
Watermark column name, if applicable. |
watermark_previous_value |
str | None
|
Prior HWM value before this run. |
watermark_new_value |
str | None
|
New HWM value captured during this run. |
watermark_persisted |
bool
|
Whether watermark state was successfully persisted. |
watermark_first_run |
bool
|
Whether this was the first run (no prior state). |
cdc_inserts |
int | None
|
Number of CDC insert operations. |
cdc_updates |
int | None
|
Number of CDC update operations. |
cdc_deletes |
int | None
|
Number of CDC delete operations. |
ValidationResult
¶
Bases: FrozenBase
Outcome of a single validation rule evaluation.
Attributes:
| Name | Type | Description |
|---|---|---|
rule_name |
str
|
Name of the validation rule. |
expression |
str
|
The Spark SQL expression that was evaluated. |
severity |
Severity
|
Severity level of the rule. |
rows_passed |
int
|
Number of rows that passed the rule. |
rows_failed |
int
|
Number of rows that failed the rule. |
applied |
bool
|
False if the rule expression itself errored during evaluation. |
WeaveTelemetry
¶
Bases: FrozenBase
Telemetry data composed into a WeaveResult.
Attributes:
| Name | Type | Description |
|---|---|---|
span |
ExecutionSpan
|
The execution span for this weave. |
thread_telemetry |
dict[str, ThreadTelemetry]
|
Per-thread telemetry keyed by thread name. |
hook_results |
list[Any]
|
Results from pre/post hook step execution. |
lookup_results |
list[Any]
|
Results from lookup materialization. |
variables |
dict[str, Any]
|
Final variable values at end of weave execution. |
ExecutionSpan
¶
Bases: FrozenBase
OTel-compatible execution span for telemetry.
Represents a unit of work (thread, weave, or loom execution) with timing, status, and key-value metric attributes. Field naming follows OTel Span semantic conventions for future export compatibility.
Attributes:
| Name | Type | Description |
|---|---|---|
trace_id |
str
|
32-character hex string identifying the execution trace. |
span_id |
str
|
16-character hex string identifying this span. |
parent_span_id |
str | None
|
span_id of the parent span, or None for root spans. |
name |
str
|
Human-readable span name (e.g., "thread:customer_dim"). |
status |
SpanStatus
|
Outcome status of this span. |
start_time |
datetime
|
UTC timestamp when the span started. |
end_time |
datetime | None
|
UTC timestamp when the span ended, or None if still open. |
attributes |
dict[str, str | int | float | bool]
|
Key-value metric attributes (row counts, durations, etc.). |
events |
list[SpanEvent]
|
Timestamped events that occurred within this span. |
SpanEvent
¶
Bases: FrozenBase
A timestamped event within a span.
SpanStatus
¶
Bases: StrEnum
Status of an execution span, aligned with OTel conventions.
LoomTrace
¶
Bases: FrozenBase
Trace node for a loom execution — the root of the trace tree.
Provides both tree-shaped navigation for programmatic access and flat span serialization for OTel export.
Attributes:
| Name | Type | Description |
|---|---|---|
span |
ExecutionSpan
|
The execution span for this loom. |
weaves |
dict[str, WeaveTrace]
|
Per-weave traces keyed by weave name. |
to_spans()
¶
Recursively flatten the entire trace tree into a span list.
Returns spans in tree order: loom span, then each weave span followed by its thread spans.
ThreadTrace
¶
Bases: FrozenBase
Trace node for a single thread execution.
Attributes:
| Name | Type | Description |
|---|---|---|
span |
ExecutionSpan
|
The execution span for this thread. |
telemetry |
ThreadTelemetry
|
Full telemetry data for the thread. |
to_spans()
¶
Return this thread's span as a flat list.
WeaveTrace
¶
Bases: FrozenBase
Trace node for a weave execution.
Attributes:
| Name | Type | Description |
|---|---|---|
span |
ExecutionSpan
|
The execution span for this weave. |
threads |
dict[str, ThreadTrace]
|
Per-thread traces keyed by thread name. |
to_spans()
¶
Flatten weave span and all child thread spans into a list.
create_log_event(level, component, message, *, thread_name=None, weave_name=None, loom_name=None, trace_id=None, span_id=None, attributes=None)
¶
Create a LogEvent with auto-generated UTC timestamp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
level
|
str
|
Log level (INFO, WARNING, ERROR, DEBUG). |
required |
component
|
str
|
Source component (e.g., "engine.executor"). |
required |
message
|
str
|
Human-readable message. |
required |
thread_name
|
str | None
|
Optional thread context. |
None
|
weave_name
|
str | None
|
Optional weave context. |
None
|
loom_name
|
str | None
|
Optional loom context. |
None
|
trace_id
|
str | None
|
Optional trace correlation ID. |
None
|
span_id
|
str | None
|
Optional span correlation ID. |
None
|
attributes
|
dict[str, str | int | float | bool] | None
|
Optional additional key-value attributes. |
None
|
Returns:
| Type | Description |
|---|---|
LogEvent
|
Frozen LogEvent instance. |
configure_logging(log_level=LogLevel.STANDARD, logger_name='weevr')
¶
Configure a weevr logger with structured JSON output.
Sets up a logger with a :class:StructuredJsonFormatter handler at the
appropriate Python logging level based on the weevr :class:LogLevel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
log_level
|
LogLevel
|
weevr log level to configure. |
STANDARD
|
logger_name
|
str
|
Name of the logger to configure. |
'weevr'
|
Returns:
| Type | Description |
|---|---|
Logger
|
Configured logger instance. |
generate_span_id()
¶
Generate a 16-character hex span ID (OTel convention).
generate_trace_id()
¶
Generate a 32-character hex trace ID (OTel convention).