Context API¶
The Context class is the primary entry point for running weevr pipelines.
It manages configuration loading, parameter injection, and orchestrates
thread, weave, and loom execution. The run() method returns a RunResult,
and load() returns a LoadedConfig — both documented below.
weevr.context
¶
Context class — the user-facing entry point for weevr.
logger = logging.getLogger(__name__)
module-attribute
¶
ConfigLocation
¶
Bases: ABC
Abstract reference to a config file or directory.
Implementations encapsulate a single addressing scheme (local filesystem or remote Hadoop URI) and expose the minimal surface area the config pipeline needs: joining, existence checks, text reads, name and parent derivation, and a containment check used for path-traversal protection.
name
abstractmethod
property
¶
The final path segment, including any extension.
stem
abstractmethod
property
¶
The final path segment without its extension.
suffix
abstractmethod
property
¶
The file extension including the leading dot, or empty string.
parent
abstractmethod
property
¶
The location one level up.
join(rel)
abstractmethod
¶
Resolve rel against this location and return a new location.
rel must be a relative path. Implementations normalize ..
and . segments and reject inputs that look absolute.
exists()
abstractmethod
¶
Return whether the underlying file or directory exists.
read_text()
abstractmethod
¶
Return the file contents decoded as UTF-8.
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If the file does not exist. |
OSError
|
For any other I/O failure. |
is_relative_to(other)
abstractmethod
¶
Return whether this location is contained within other.
__str__()
abstractmethod
¶
Return the canonical path or URI for diagnostics and logging.
__fspath__()
¶
Return the string form for os.fspath consumers.
Remote implementations return their URI; using the result with the local filesystem will fail loudly, which is the desired behavior.
LocalConfigLocation
¶
Bases: ConfigLocation
A :class:ConfigLocation backed by a local filesystem path.
path
property
¶
The underlying :class:pathlib.Path.
name
property
¶
The final path segment.
stem
property
¶
The final path segment without its extension.
suffix
property
¶
The file extension including the leading dot.
parent
property
¶
The parent directory as a :class:LocalConfigLocation.
__init__(path)
¶
Wrap an absolute or relative :class:Path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path
|
The path to wrap. Stored as-is; resolution and existence checks are performed lazily by individual methods. |
required |
join(rel)
¶
Join rel to the underlying path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rel
|
str
|
Relative path to append. |
required |
Returns:
| Type | Description |
|---|---|
ConfigLocation
|
A new :class: |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
exists()
¶
Return whether the path exists on the local filesystem.
read_text()
¶
Read the file as UTF-8 text.
is_relative_to(other)
¶
Return whether this path is contained within other.
Both sides are resolved to absolute paths before comparison so
relative segments such as .. are honored. Comparing across
location types always returns False.
__str__()
¶
Return the underlying path as a string.
__repr__()
¶
Developer-friendly representation.
__eq__(other)
¶
Two locations are equal when their underlying paths are equal.
__hash__()
¶
Hash by underlying path.
RemoteConfigLocation
¶
Bases: ConfigLocation
A :class:ConfigLocation backed by a Hadoop-accessible URI.
Operations route through the JVM org.apache.hadoop.fs.FileSystem API
available on the active :class:SparkSession. The implementation works
for any scheme that Spark's Hadoop client can resolve, including
abfss://, wasbs://, s3a://, gs://, and file://.
uri
property
¶
The underlying URI string.
name
property
¶
The final /-delimited segment of the URI.
stem
property
¶
The final segment with its extension stripped.
suffix
property
¶
The file extension including the leading dot.
parent
property
¶
The URI one level up.
Returns the same location when already at the authority root.
__init__(uri, spark)
¶
Construct a remote location.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uri
|
str
|
A fully qualified URI such as
|
required |
spark
|
SparkSession
|
Active :class: |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
join(rel)
¶
Join rel to this URI, normalizing . and .. segments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rel
|
str
|
Relative path to append. |
required |
Returns:
| Type | Description |
|---|---|
ConfigLocation
|
A new :class: |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
exists()
¶
Return whether the underlying URI exists.
Wraps FileSystem.exists. JVM exceptions are propagated as
:class:OSError so callers can handle filesystem errors uniformly.
read_text()
¶
Read the file as UTF-8 text via Hadoop FileSystem.open.
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If the underlying file does not exist. |
OSError
|
For any other I/O failure. |
is_relative_to(other)
¶
Return whether this URI lies within other.
Comparison is a normalized prefix match on the URI string. Comparing
across location types always returns False.
__str__()
¶
Return the underlying URI.
__repr__()
¶
Developer-friendly representation.
__eq__(other)
¶
Two remote locations are equal when their URIs match.
__hash__()
¶
Hash by URI.
ConfigError
¶
Bases: WeevError
Base exception for configuration-related errors.
__init__(message, cause=None, file_path=None, config_key=None)
¶
Initialize ConfigError.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str
|
Human-readable error message |
required |
cause
|
Exception | None
|
Optional underlying exception |
None
|
file_path
|
str | None
|
Path to the config file where the error occurred |
None
|
config_key
|
str | None
|
Specific config key that caused the error |
None
|
__str__()
¶
Return string representation with context.
DataValidationError
¶
Bases: WeevError
Raised when a fatal-severity validation rule has failing rows.
When any row fails a validation rule declared with severity: fatal,
the thread aborts immediately without writing data. Downgrade to
severity: error to quarantine failing rows instead of aborting.
ExecutionError
¶
Bases: WeevError
Base exception for execution-time errors.
Carries optional execution context to pinpoint where a failure occurred within a thread pipeline.
__init__(message, cause=None, thread_name=None, step_index=None, step_type=None, source_name=None)
¶
Initialize ExecutionError.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str
|
Human-readable error message. |
required |
cause
|
Exception | None
|
Optional underlying exception. |
None
|
thread_name
|
str | None
|
Name of the thread where the error occurred. |
None
|
step_index
|
int | None
|
Zero-based index of the pipeline step that failed. |
None
|
step_type
|
str | None
|
Step type key (e.g. "filter", "join") that failed. |
None
|
source_name
|
str | None
|
Source alias that caused the error. |
None
|
__str__()
¶
Return string representation with execution context.
ModelValidationError
¶
Bases: ConfigError
Raised when a fully resolved config fails to hydrate into a typed model.
This occurs after variable resolution and inheritance, when the concrete values are validated through the Pydantic domain model (Thread, Weave, or Loom). Semantic constraints that span multiple fields are checked here.
VariableResolutionError
¶
Bases: ConfigError
Raised when a ${variable} reference cannot be resolved.
This occurs when a variable placeholder has no matching entry in the
parameter context and no default value (${var:-default}) is provided.
LogLevel
¶
Bases: StrEnum
Configurable log level for weevr execution.
Controls the verbosity of structured logging output during pipeline execution. Maps to Python logging levels internally.
Loom
¶
Bases: FrozenBase
A deployment unit containing weave references with optional shared defaults.
Thread
¶
Bases: FrozenBase
Complete domain model for a thread configuration.
A thread is the smallest unit of work: one or more sources, a sequence of transformation steps, and a single target.
ConditionSpec
¶
Bases: FrozenBase
A condition expression for conditional execution.
Attributes:
| Name | Type | Description |
|---|---|---|
when |
str
|
Condition expression string. Supports parameter references
( |
Weave
¶
Bases: FrozenBase
A collection of thread references with optional shared defaults.
ExecutionMode
¶
Bases: StrEnum
Execution modes for Context.run().
Attributes:
| Name | Type | Description |
|---|---|---|
EXECUTE |
Full execution — read, transform, write (default). |
|
VALIDATE |
Config + DAG + source existence checks, no execution. |
|
PLAN |
Build and return execution plans without executing. |
|
PREVIEW |
Execute with sampled data, no writes. |
LoadedConfig
¶
Wrapper around a hydrated config model returned by Context.load().
Provides access to the underlying model (Thread, Weave, or Loom) and a lazily-built execution plan for weave/loom configs. Proxies attribute access to the model for convenience.
Attributes:
| Name | Type | Description |
|---|---|---|
model |
Any
|
The hydrated domain model. |
config_type |
str
|
Config kind ( |
config_name |
str
|
Name derived from the config file path. |
model
property
¶
The underlying Thread, Weave, or Loom model.
config_type
property
¶
Config kind: "thread", "weave", or "loom".
config_name
property
¶
Name derived from the config file path.
execution_plan
property
¶
Lazily-built execution plans for weave/loom configs.
Returns None for thread configs. For weave configs, returns a
single-element list. For loom configs, returns one plan per weave.
__init__(model, config_type, config_name, threads=None, weaves=None)
¶
Initialize with a hydrated model and its metadata.
flow(*, dark=None)
¶
Return a thread flow diagram for thread configs.
Generates an inline SVG showing the thread's processing pipeline:
sources, joins, transforms, and target. Returns None for
weave/loom configs (use per-thread flow in result reports instead).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dark
|
bool | None
|
Force dark ( |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
A |
Any
|
class: |
Any
|
for non-thread configs. |
__getattr__(name)
¶
Proxy attribute access to the underlying model.
RunResult
¶
Unified result returned by Context.run() for all execution modes.
Provides a consistent interface regardless of config type (thread, weave,
loom) or execution mode (execute, validate, plan, preview). Mode-specific
fields are None when not applicable.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
Aggregate outcome — |
|
mode |
The execution mode that produced this result. |
|
config_type |
Config kind that was executed ( |
|
config_name |
Name derived from the config file path. |
|
duration_ms |
Wall-clock duration in milliseconds. |
|
detail |
Underlying engine result (execute mode only). |
|
telemetry |
Structured telemetry data (execute mode only). |
|
execution_plan |
Resolved execution plans (plan mode only). |
|
preview_data |
Output DataFrames keyed by thread name (preview mode only). |
|
validation_errors |
Error messages from validation checks (validate mode only). |
|
warnings |
list[str]
|
Non-fatal messages (e.g., zero threads matched a filter). |
__init__(*, status, mode, config_type, config_name, duration_ms=0, detail=None, telemetry=None, execution_plan=None, preview_data=None, validation_errors=None, warnings=None)
¶
Initialize a RunResult with execution outcome and telemetry.
dag(*, dark=None)
¶
Return a DAG diagram for plan mode results.
Returns a :class:~weevr.engine.display.DAGDiagram for plan mode,
or None for other modes. For multi-weave loom results, returns
a loom-level swimlane DAG.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dark
|
bool | None
|
Force dark ( |
None
|
explain()
¶
Return a detailed text breakdown of the execution plan.
Includes dependency provenance, cache targets, lookup schedule, and per-thread source/target/step summary. Sections with no data are omitted. Returns empty string for non-plan modes.
summary()
¶
Return a formatted, human-readable execution summary.
_ResolvedConfig
dataclass
¶
Intermediate container for resolved config data.
Context
¶
Entry point for all weevr operations.
Wraps a SparkSession with a project reference, resolved parameters,
and execution configuration. Provides run() for execution and
load() for model inspection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession (required). |
required |
project
|
str | Path
|
Project identifier. Accepts three forms:
|
required |
params
|
dict[str, Any] | None
|
Runtime parameter overrides. |
None
|
log_level
|
str
|
Logging verbosity — |
'standard'
|
workspace
|
str | None
|
Fabric workspace ID for cross-lakehouse resolution. |
None
|
lakehouse
|
str | None
|
Fabric lakehouse ID for cross-lakehouse resolution. |
None
|
spark
property
¶
The underlying SparkSession.
params
property
¶
Runtime parameter overrides.
log_level
property
¶
Configured logging verbosity.
project_root
property
¶
Resolved project root as a :class:ConfigLocation.
Returns a :class:LocalConfigLocation when the project is on the
local filesystem (Tier 1 default lakehouse or Tier 3 direct path)
and a :class:RemoteConfigLocation when the project is qualified
by workspace and lakehouse (Tier 2 OneLake).
__init__(spark, project, *, params=None, log_level='standard', workspace=None, lakehouse=None)
¶
Initialize a Context with a SparkSession and project reference.
load(path)
¶
Load and validate a config file, returning a model wrapper.
Parses, resolves, and hydrates the config at path (relative to
the project root) without executing anything. The returned
:class:LoadedConfig exposes the underlying model and, for
weave/loom configs, a lazily-built execution plan.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path to a config file, relative to the project root. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
A |
LoadedConfig
|
class: |
run(path, *, mode='execute', tags=None, threads=None, sample_rows=100)
¶
Run a config file in the specified mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path to a config file, relative to the project root. |
required |
mode
|
str
|
Execution mode — |
'execute'
|
tags
|
list[str] | None
|
Run only threads matching any of these tags. |
None
|
threads
|
list[str] | None
|
Run only threads with these names. |
None
|
sample_rows
|
int
|
Maximum rows for preview mode (default 100). |
100
|
Returns:
| Name | Type | Description |
|---|---|---|
A |
RunResult
|
class: |
Raises:
| Type | Description |
|---|---|
ValueError
|
If mode is invalid or both tags and threads are provided. |
apply_inheritance(loom_defaults, weave_defaults, thread_config, *, loom_audit_templates=None, weave_audit_templates=None, loom_connections=None, weave_connections=None)
¶
Apply multi-level inheritance cascade.
Cascade order (lowest to highest priority): 1. loom_defaults (lowest) 2. weave_defaults 3. thread_config (highest)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
loom_defaults
|
dict[str, Any] | None
|
Defaults from loom level |
required |
weave_defaults
|
dict[str, Any] | None
|
Defaults from weave level |
required |
thread_config
|
dict[str, Any]
|
Thread-specific config |
required |
loom_audit_templates
|
dict[str, Any] | None
|
User-defined audit template definitions from loom |
None
|
weave_audit_templates
|
dict[str, Any] | None
|
User-defined audit template definitions from weave |
None
|
loom_connections
|
dict[str, Any] | None
|
Named connection definitions from loom top-level |
None
|
weave_connections
|
dict[str, Any] | None
|
Named connection definitions from weave top-level |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Fully merged config with thread values taking precedence |
expand_foreach(steps)
¶
Expand foreach macro blocks into repeated step sequences.
Each foreach block in the steps list is replaced by its template steps
repeated once per value, with {var} placeholders substituted.
Non-foreach entries pass through unchanged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
steps
|
list[dict[str, Any]]
|
Raw step list (dicts), possibly containing foreach blocks. |
required |
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
Expanded step list with all foreach blocks replaced. |
Raises:
| Type | Description |
|---|---|
ConfigError
|
If a foreach block is missing required fields. |
detect_config_type_from_extension(path)
¶
Detect config type from file extension.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path | ConfigLocation
|
Path or location of the config file. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Config type string if the extension is a typed extension |
str | None
|
( |
str | None
|
|
Raises:
| Type | Description |
|---|---|
ConfigError
|
If the extension is |
extract_config_version(raw)
¶
Extract and parse config_version from a config dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw
|
dict[str, Any]
|
Parsed config dictionary |
required |
Returns:
| Type | Description |
|---|---|
tuple[int, int]
|
Tuple of (major, minor) version numbers |
Raises:
| Type | Description |
|---|---|
ConfigParseError
|
If config_version is missing or invalid format |
parse_yaml(path)
¶
Parse a YAML file and return its contents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path | ConfigLocation
|
A local path, a URI string, or a :class: |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Parsed YAML content as a dictionary. |
Raises:
| Type | Description |
|---|---|
ConfigParseError
|
If the file is not found, unreadable, or contains invalid YAML syntax. |
validate_config_version(version, config_type)
¶
Validate that the config version is supported.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
version
|
tuple[int, int]
|
Tuple of (major, minor) version |
required |
config_type
|
str
|
Type of config (thread, weave, loom, params) |
required |
Raises:
| Type | Description |
|---|---|
ConfigVersionError
|
If the major version doesn't match supported version |
build_param_context(runtime_params=None, config_defaults=None, fabric_context=None, entry_params=None)
¶
Build parameter context with proper priority layering.
Priority order (highest to lowest):
1. runtime_params
2. entry_params (nested under param key for ${param.x} access)
3. config_defaults
4. fabric_context
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
runtime_params
|
dict[str, Any] | None
|
Runtime parameter overrides. |
None
|
config_defaults
|
dict[str, Any] | None
|
Default parameters from config. |
None
|
fabric_context
|
dict[str, Any] | None
|
Fabric environment values keyed as |
None
|
entry_params
|
dict[str, Any] | None
|
ThreadEntry-level parameters injected under the
|
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Merged parameter context dictionary with dotted key access support. |
resolve_declared_params(param_specs, runtime_params, *, file_path=None)
¶
Resolve loom/weave-level declared params: to a flat {name: value} dict.
Precedence per declared param:
- Value from
runtime_paramsif supplied ParamSpec.defaultif set on the specConfigSchemaErrorif the param is required- Omitted from the result if optional with no default
The returned dict is intended to bind under the param.* namespace via
:func:build_param_context's entry_params argument, so that
${param.x} expressions resolve to the layered value.
Runtime keys not declared in param_specs are ignored — declared scope
is the only contract honored at this layer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
param_specs
|
dict[str, Any] | None
|
Mapping of declared params ( |
required |
runtime_params
|
dict[str, Any] | None
|
Caller-supplied values keyed by param name. |
required |
file_path
|
str | None
|
Optional originating config file path for error messages. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Resolved |
Raises:
| Type | Description |
|---|---|
ConfigSchemaError
|
A required declared param has no runtime value and no spec default. The message includes the file path and param name. |
resolve_references(config, config_type, project_root, runtime_params=None, visited=None)
¶
Resolve references to other config files.
Handles both external references (ref key) and inline definitions
(name + body keys). Recursively loads referenced configs with
circular reference detection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Config dict to resolve references in. |
required |
config_type
|
str
|
Type of this config ( |
required |
project_root
|
ConfigLocation | Path
|
The |
required |
runtime_params
|
dict[str, Any] | None
|
Runtime parameters to pass to child configs. |
None
|
visited
|
set[str] | None
|
Set of already-visited ref strings (for cycle detection). |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Config dict with resolved child configs attached under |
dict[str, Any]
|
|
Raises:
| Type | Description |
|---|---|
ReferenceResolutionError
|
If referenced file not found or circular reference detected. |
ConfigError
|
If an inline definition is missing a |
resolve_variables(config, context, consumed_keys=None)
¶
Recursively resolve variable references in config.
Supports: - ${var} - simple variable reference (error if not found) - ${var:-default} - variable with fallback default
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any] | list[Any] | str | Any
|
Config structure to resolve (dict, list, str, or primitive) |
required |
context
|
dict[str, Any]
|
Parameter context for variable lookup |
required |
consumed_keys
|
set[str] | None
|
Optional set to track which context keys were consumed during resolution. When provided, each resolved dotted key is added to the set for post-resolution unused-param analysis. |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
Config with all variables resolved |
Raises:
| Type | Description |
|---|---|
VariableResolutionError
|
If variable not found and no default provided |
validate_params(param_specs, context)
¶
Validate parameters against their type specifications.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
param_specs
|
dict[str, Any] | None
|
Parameter specifications from config |
required |
context
|
dict[str, Any]
|
Actual parameter values |
required |
Raises:
| Type | Description |
|---|---|
ConfigSchemaError
|
If required params missing or type mismatches |
validate_schema(raw, config_type)
¶
Validate a raw config dict against the appropriate pre-resolution schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw
|
dict[str, Any]
|
Raw config dictionary (variables not yet resolved) |
required |
config_type
|
str
|
Type of config (thread, weave, loom, params) |
required |
Returns:
| Type | Description |
|---|---|
BaseModel
|
Validated Pydantic model instance |
Raises:
| Type | Description |
|---|---|
ConfigSchemaError
|
If validation fails |
is_table_alias(path)
¶
Return True if path looks like a table alias (e.g. schema.table).
Table aliases are dot-separated identifiers resolved by the Spark metastore.
File paths contain slashes or URI schemes (://).
execute_thread(spark, thread, collector=None, parent_span_id=None, cached_lookups=None, weave_lookups=None, resolved_params=None, loom_name='', weave_name='', column_sets=None, column_set_defs=None, connections=None)
¶
Execute a single thread from sources through transforms to a Delta target.
Execution order:
1. Initialize thread-level resources (variables, lookups, column_sets).
2. Run thread-level pre_steps.
3. Resolve lookup-based sources from cached or on-demand DataFrames.
4. Read all remaining declared sources into DataFrames.
- For incremental_watermark: load prior HWM, apply filter, capture new HWM.
- For cdc: read via CDF or generic CDC source.
5. Set the primary (first) source as the working DataFrame.
6. Run pipeline steps against the working DataFrame.
7. Run validation rules (if configured) — quarantine or abort on failures.
8. Apply naming normalization (if configured).
9. Compute business keys and hashes if configured.
10. Resolve the target write path.
11. Apply target column mapping.
12. Inject audit columns (if configured; bypasses mapping mode).
13. Write to the Delta target.
- For cdc: use CDC merge routing instead of standard write.
14. Persist watermark state (if applicable).
15. Run post-write assertions (if configured).
16. Write exports (secondary outputs, if configured).
17. Run thread-level post_steps.
18. Build telemetry and return ThreadResult.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
thread
|
Thread
|
Thread configuration to execute. |
required |
collector
|
SpanCollector | None
|
Optional span collector for telemetry. When provided, a thread span is created and finalized. |
None
|
parent_span_id
|
str | None
|
Optional parent span ID for trace tree linkage. |
None
|
cached_lookups
|
dict[str, Any] | None
|
Pre-materialized lookup DataFrames keyed by lookup name.
Runtime type is |
None
|
weave_lookups
|
dict[str, Lookup] | None
|
Weave-level lookup definitions for on-demand resolution of non-materialized lookups. |
None
|
resolved_params
|
dict[str, Any] | None
|
Runtime parameter values for telemetry capture. Stored on ThreadTelemetry for thread-level runs. |
None
|
loom_name
|
str
|
Loom name for audit column context variables. |
''
|
weave_name
|
str
|
Weave name for audit column context variables.
Derived from the prefix of |
''
|
column_sets
|
dict[str, dict[str, str]] | None
|
Pre-resolved column set mappings keyed by name. Passed
through to |
None
|
column_set_defs
|
dict[str, ColumnSet] | None
|
Column set model instances keyed by name. Passed
through to |
None
|
connections
|
dict[str, OneLakeConnection] | None
|
Named connection declarations keyed by connection name. When provided, source reads, lookup materialization, target path resolution, and export writes use connection-based paths where configured. |
None
|
Returns:
| Type | Description |
|---|---|
ThreadResult
|
class: |
ThreadResult
|
and optional telemetry. |
Raises:
| Type | Description |
|---|---|
ExecutionError
|
If any step fails, with |
DataValidationError
|
If a fatal-severity validation rule fails. |
ExportError
|
If an export with |
build_plan(weave_name, threads, thread_entries, lookups=None)
¶
Build an execution plan from a weave's threads and declared/inferred dependencies.
Performs DAG construction, cycle detection, topological sort, and cache analysis
to produce an immutable :class:ExecutionPlan.
When lookups are provided, the planner also:
- Infers implicit dependencies between threads mediated by lookups (a thread consuming a lookup whose source is produced by another thread depends on that producer).
- Computes a lookup schedule mapping each execution group boundary to the lookups that should be materialized before that group runs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weave_name
|
str
|
Name of the weave being planned. |
required |
threads
|
dict[str, Thread]
|
Mapping of thread name to :class: |
required |
thread_entries
|
list[ThreadEntry]
|
Ordered thread entries from the weave config, which may carry explicit dependency declarations. |
required |
lookups
|
dict[str, Lookup] | None
|
Optional weave-level lookup definitions. When provided, lookup- mediated dependencies are inferred and a materialization schedule is computed. |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionPlan
|
An immutable :class: |
Raises:
| Type | Description |
|---|---|
ConfigError
|
If a circular dependency is detected (with cycle path in
the message), or if an explicit dependency references a thread that
does not exist in |
execute_loom(spark, loom, weaves, threads, params=None)
¶
Execute weaves sequentially in declared order.
Iterates the weaves listed in loom.weaves, builds an
:class:~weevr.engine.planner.ExecutionPlan for each, and executes them
via :func:execute_weave. Creates a root span collector for telemetry
and passes it through the execution tree.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
loom
|
Loom
|
Loom configuration declaring the ordered list of weave names. |
required |
weaves
|
dict[str, Weave]
|
Mapping of weave name to :class: |
required |
threads
|
dict[str, dict[str, Thread]]
|
Nested mapping of |
required |
params
|
dict[str, Any] | None
|
Parameters for condition evaluation at weave and thread levels. |
None
|
Returns:
| Type | Description |
|---|---|
LoomResult
|
class: |
LoomResult
|
per-weave results. |
execute_weave(spark, plan, threads, collector=None, parent_span_id=None, thread_conditions=None, params=None, weave_span_label=None, pre_steps=None, post_steps=None, lookups=None, variables=None, loom_name='', weave_name='', column_set_defs=None, pre_cached_lookups=None, connections=None)
¶
Execute threads according to the execution plan.
Processes each parallel group sequentially, submitting threads within a
group to a :class:~concurrent.futures.ThreadPoolExecutor for concurrent
execution. Respects on_failure config on each thread and manages the
cache lifecycle via :class:~weevr.engine.cache_manager.CacheManager.
When hooks are provided, the lifecycle is:
1. Initialize VariableContext from variables.
2. Execute pre_steps.
3. Materialize lookups (materialize=True).
4. Materialize column sets.
5. Execute threads (with cached lookup DataFrames).
6. Execute post_steps.
7. Cleanup lookups.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
plan
|
ExecutionPlan
|
Immutable :class: |
required |
threads
|
dict[str, Thread]
|
Mapping of thread name to :class: |
required |
collector
|
SpanCollector | None
|
Optional span collector for telemetry. When provided, per-thread collectors are created and merged after execution. |
None
|
parent_span_id
|
str | None
|
Optional parent span ID for trace tree linkage. |
None
|
thread_conditions
|
dict[str, ConditionSpec] | None
|
Mapping of thread name to condition spec. Threads with a condition that evaluates to False are skipped. |
None
|
params
|
dict[str, Any] | None
|
Parameters for condition evaluation. |
None
|
weave_span_label
|
str | None
|
Label for the weave telemetry span. When set, used
instead of |
None
|
pre_steps
|
Sequence[HookStep] | None
|
Optional pre-execution hook steps. |
None
|
post_steps
|
Sequence[HookStep] | None
|
Optional post-execution hook steps. |
None
|
lookups
|
dict[str, Lookup] | None
|
Optional weave-level lookup definitions. |
None
|
variables
|
dict[str, VariableSpec] | None
|
Optional weave-level variable specs. |
None
|
loom_name
|
str
|
Loom name passed to threads for audit column context. |
''
|
weave_name
|
str
|
Weave name passed to threads for audit column context. |
''
|
column_set_defs
|
dict[str, ColumnSet] | None
|
Merged column set definitions (loom-level merged with weave-level, weave wins) to be materialized and forwarded to each thread for rename step resolution. |
None
|
pre_cached_lookups
|
dict[str, Any] | None
|
Already-materialized lookup DataFrames from a parent scope (e.g. loom level). Seeded into the weave's cached lookup dict so threads can reference them without re-materializing. |
None
|
connections
|
dict[str, OneLakeConnection] | None
|
Merged loom + weave connections forwarded to
|
None
|
Returns:
| Type | Description |
|---|---|
WeaveResult
|
class: |
WeaveResult
|
per-thread results. |
configure_logging(log_level=LogLevel.STANDARD, logger_name='weevr')
¶
Configure a weevr logger with structured JSON output.
Sets up a logger with a :class:StructuredJsonFormatter handler at the
appropriate Python logging level based on the weevr :class:LogLevel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
log_level
|
LogLevel
|
weevr log level to configure. |
STANDARD
|
logger_name
|
str
|
Name of the logger to configure. |
'weevr'
|
Returns:
| Type | Description |
|---|---|
Logger
|
Configured logger instance. |
Result Types¶
weevr.result
¶
Unified result types for the weevr Python API.
ExecutionPlan
¶
Bases: FrozenBase
Immutable execution plan produced by the planner.
Attributes:
| Name | Type | Description |
|---|---|---|
weave_name |
str
|
Name of the weave this plan was built for. |
threads |
list[str]
|
All thread names in the weave. |
dependencies |
dict[str, list[str]]
|
Maps each thread to the list of threads it depends on. |
dependents |
dict[str, list[str]]
|
Maps each thread to the list of threads that depend on it. |
execution_order |
list[list[str]]
|
Parallel execution groups in topological order. Each inner list contains threads that can run concurrently. |
cache_targets |
list[str]
|
Thread names whose output should be cached because they feed two or more downstream threads. |
inferred_dependencies |
dict[str, list[str]]
|
The auto-inferred subset of |
explicit_dependencies |
dict[str, list[str]]
|
The explicitly declared subset of |
lookup_schedule |
dict[int, list[str]] | None
|
Maps execution group indices to lookups that should
be materialized at that boundary. Key 0 means before the first
group; key N (N > 0) means after group N-1 finishes. |
lookup_producers |
dict[str, str | None] | None
|
Maps lookup names to the thread that produces
their source data, or |
lookup_consumers |
dict[str, list[str]] | None
|
Maps lookup names to the list of threads that
consume each lookup. Only populated when |
dag(*, dark=None)
¶
Return an inline SVG DAG diagram of this execution plan.
The returned :class:DAGDiagram auto-renders in notebooks
via _repr_svg_() and can be exported via save().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dark
|
bool | None
|
Force dark ( |
None
|
LoomResult
¶
Bases: FrozenBase
Immutable record of a completed loom execution.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
Literal['success', 'failure', 'partial']
|
Aggregate outcome — |
loom_name |
str
|
Name of the loom that was executed. |
weave_results |
list[WeaveResult]
|
Results for each weave that was executed. |
duration_ms |
int
|
Wall-clock duration of the loom execution in milliseconds. |
telemetry |
LoomTelemetry | None
|
Loom-level telemetry aggregated from weave telemetry. |
ThreadResult
¶
Bases: FrozenBase
Immutable record of a completed thread execution.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
Literal['success', 'failure', 'skipped']
|
Outcome of the execution — |
thread_name |
str
|
Name of the thread that was executed. |
rows_written |
int
|
Number of rows in the DataFrame at write time. |
write_mode |
str
|
The write mode used ( |
target_path |
str
|
Physical path of the Delta table that was written. |
telemetry |
ThreadTelemetry | None
|
Thread-level telemetry with validation/assertion results and row counts. |
skip_reason |
str | None
|
The condition expression that caused the thread to be skipped. |
error |
str | None
|
Error message when the thread failed, |
output_schema |
list[tuple[str, str]] | None
|
Column names and Spark data types captured from the output
DataFrame before write. Each tuple is |
samples |
dict[str, list[dict[str, Any]]] | None
|
Data samples captured before write, keyed by category. Contains
|
drift_report |
dict[str, Any] | None
|
Schema drift report captured before write when schema drift
detection is active. |
warp_findings |
list[dict[str, str]] | None
|
Warp enforcement findings from this run — each entry names
a column and the mismatch reason. |
WeaveResult
¶
Bases: FrozenBase
Immutable record of a completed weave execution.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
Literal['success', 'failure', 'partial', 'skipped']
|
Aggregate outcome — |
weave_name |
str
|
Name of the weave that was executed. |
thread_results |
list[ThreadResult]
|
Results for each thread that was executed (not skipped). |
threads_skipped |
list[str]
|
Names of threads that were skipped due to upstream failure. |
duration_ms |
int
|
Wall-clock duration of the weave execution in milliseconds. |
telemetry |
WeaveTelemetry | None
|
Weave-level telemetry aggregated from thread telemetry. |
skip_reason |
str | None
|
The condition expression that caused the weave to be skipped. |
LoomTelemetry
¶
Bases: FrozenBase
Telemetry data composed into a LoomResult.
Attributes:
| Name | Type | Description |
|---|---|---|
span |
ExecutionSpan
|
The execution span for this loom. |
weave_telemetry |
dict[str, WeaveTelemetry]
|
Per-weave telemetry keyed by weave name. |
resolved_params |
dict[str, Any] | None
|
Runtime parameter values that drove this execution. Populated on the outermost telemetry object only (loom-level runs). |
ThreadTelemetry
¶
Bases: FrozenBase
Telemetry data composed into a ThreadResult.
Attributes:
| Name | Type | Description |
|---|---|---|
span |
ExecutionSpan
|
The execution span for this thread. |
validation_results |
list[ValidationResult]
|
Results of pre-write validation rules. |
assertion_results |
list[AssertionResult]
|
Results of post-write assertions. |
rows_read |
int
|
Total rows read from sources. |
rows_written |
int
|
Rows written to the target. |
rows_quarantined |
int
|
Rows written to the quarantine table. |
rows_after_transforms |
int
|
Row count after transforms, before validation. |
load_mode |
str | None
|
Load mode used (full, incremental_watermark, etc.). |
watermark_column |
str | None
|
Watermark column name, if applicable. |
watermark_previous_value |
str | None
|
Prior HWM value before this run. |
watermark_new_value |
str | None
|
New HWM value captured during this run. |
watermark_persisted |
bool
|
Whether watermark state was successfully persisted. |
watermark_first_run |
bool
|
Whether this was the first run (no prior state). |
cdc_inserts |
int | None
|
Number of CDC insert operations. |
cdc_updates |
int | None
|
Number of CDC update operations. |
cdc_deletes |
int | None
|
Number of CDC delete operations. |
resolved_params |
dict[str, Any] | None
|
Runtime parameter values that drove this execution. Populated on the outermost telemetry object only (thread-level runs). |
audit_columns_applied |
list[str]
|
Names of audit columns injected into the output DataFrame for this thread. |
export_results |
list[ExportResult]
|
Per-export write results, one per export configured on the thread. |
WeaveTelemetry
¶
Bases: FrozenBase
Telemetry data composed into a WeaveResult.
Attributes:
| Name | Type | Description |
|---|---|---|
span |
ExecutionSpan
|
The execution span for this weave. |
thread_telemetry |
dict[str, ThreadTelemetry]
|
Per-thread telemetry keyed by thread name. |
hook_results |
list[Any]
|
Results from pre/post hook step execution. |
lookup_results |
list[Any]
|
Results from lookup materialization. |
column_set_results |
list[ColumnSetResult]
|
Results from column set resolution. |
variables |
dict[str, Any]
|
Final variable values at end of weave execution. |
resolved_params |
dict[str, Any] | None
|
Runtime parameter values that drove this execution. Populated on the outermost telemetry object only (weave-level runs). |
FormattedText
¶
Bases: str
String subclass that displays without quotes in REPLs and notebooks.
Overrides __repr__ so that interactive environments (Python REPL,
Jupyter, Fabric notebooks) render the text with real newlines instead
of showing escaped \n sequences inside quotes.
Behaves identically to str in all other contexts — comparisons,
slicing, formatting, print(), and serialization are unaffected.
__repr__()
¶
Return the string value without quoting or escaping.
ExecutionMode
¶
Bases: StrEnum
Execution modes for Context.run().
Attributes:
| Name | Type | Description |
|---|---|---|
EXECUTE |
Full execution — read, transform, write (default). |
|
VALIDATE |
Config + DAG + source existence checks, no execution. |
|
PLAN |
Build and return execution plans without executing. |
|
PREVIEW |
Execute with sampled data, no writes. |
RunResult
¶
Unified result returned by Context.run() for all execution modes.
Provides a consistent interface regardless of config type (thread, weave,
loom) or execution mode (execute, validate, plan, preview). Mode-specific
fields are None when not applicable.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
Aggregate outcome — |
|
mode |
The execution mode that produced this result. |
|
config_type |
Config kind that was executed ( |
|
config_name |
Name derived from the config file path. |
|
duration_ms |
Wall-clock duration in milliseconds. |
|
detail |
Underlying engine result (execute mode only). |
|
telemetry |
Structured telemetry data (execute mode only). |
|
execution_plan |
Resolved execution plans (plan mode only). |
|
preview_data |
Output DataFrames keyed by thread name (preview mode only). |
|
validation_errors |
Error messages from validation checks (validate mode only). |
|
warnings |
list[str]
|
Non-fatal messages (e.g., zero threads matched a filter). |
__init__(*, status, mode, config_type, config_name, duration_ms=0, detail=None, telemetry=None, execution_plan=None, preview_data=None, validation_errors=None, warnings=None)
¶
Initialize a RunResult with execution outcome and telemetry.
dag(*, dark=None)
¶
Return a DAG diagram for plan mode results.
Returns a :class:~weevr.engine.display.DAGDiagram for plan mode,
or None for other modes. For multi-weave loom results, returns
a loom-level swimlane DAG.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dark
|
bool | None
|
Force dark ( |
None
|
explain()
¶
Return a detailed text breakdown of the execution plan.
Includes dependency provenance, cache targets, lookup schedule, and per-thread source/target/step summary. Sections with no data are omitted. Returns empty string for non-plan modes.
summary()
¶
Return a formatted, human-readable execution summary.
LoadedConfig
¶
Wrapper around a hydrated config model returned by Context.load().
Provides access to the underlying model (Thread, Weave, or Loom) and a lazily-built execution plan for weave/loom configs. Proxies attribute access to the model for convenience.
Attributes:
| Name | Type | Description |
|---|---|---|
model |
Any
|
The hydrated domain model. |
config_type |
str
|
Config kind ( |
config_name |
str
|
Name derived from the config file path. |
model
property
¶
The underlying Thread, Weave, or Loom model.
config_type
property
¶
Config kind: "thread", "weave", or "loom".
config_name
property
¶
Name derived from the config file path.
execution_plan
property
¶
Lazily-built execution plans for weave/loom configs.
Returns None for thread configs. For weave configs, returns a
single-element list. For loom configs, returns one plan per weave.
__init__(model, config_type, config_name, threads=None, weaves=None)
¶
Initialize with a hydrated model and its metadata.
flow(*, dark=None)
¶
Return a thread flow diagram for thread configs.
Generates an inline SVG showing the thread's processing pipeline:
sources, joins, transforms, and target. Returns None for
weave/loom configs (use per-thread flow in result reports instead).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dark
|
bool | None
|
Force dark ( |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
A |
Any
|
class: |
Any
|
for non-thread configs. |
__getattr__(name)
¶
Proxy attribute access to the underlying model.
_format_duration(ms)
¶
Format milliseconds as a human-readable duration string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ms
|
int | float
|
Duration in milliseconds. |
required |
Returns:
| Type | Description |
|---|---|
str
|
Formatted string like |