Skip to content

Context API

The Context class is the primary entry point for running weevr pipelines. It manages configuration loading, parameter injection, and orchestrates thread, weave, and loom execution. The run() method returns a RunResult, and load() returns a LoadedConfig — both documented below.

weevr.context

Context class — the user-facing entry point for weevr.

logger = logging.getLogger(__name__) module-attribute

ConfigLocation

Bases: ABC

Abstract reference to a config file or directory.

Implementations encapsulate a single addressing scheme (local filesystem or remote Hadoop URI) and expose the minimal surface area the config pipeline needs: joining, existence checks, text reads, name and parent derivation, and a containment check used for path-traversal protection.

name abstractmethod property

The final path segment, including any extension.

stem abstractmethod property

The final path segment without its extension.

suffix abstractmethod property

The file extension including the leading dot, or empty string.

parent abstractmethod property

The location one level up.

join(rel) abstractmethod

Resolve rel against this location and return a new location.

rel must be a relative path. Implementations normalize .. and . segments and reject inputs that look absolute.

exists() abstractmethod

Return whether the underlying file or directory exists.

read_text() abstractmethod

Return the file contents decoded as UTF-8.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

OSError

For any other I/O failure.

is_relative_to(other) abstractmethod

Return whether this location is contained within other.

__str__() abstractmethod

Return the canonical path or URI for diagnostics and logging.

__fspath__()

Return the string form for os.fspath consumers.

Remote implementations return their URI; using the result with the local filesystem will fail loudly, which is the desired behavior.

LocalConfigLocation

Bases: ConfigLocation

A :class:ConfigLocation backed by a local filesystem path.

path property

The underlying :class:pathlib.Path.

name property

The final path segment.

stem property

The final path segment without its extension.

suffix property

The file extension including the leading dot.

parent property

The parent directory as a :class:LocalConfigLocation.

__init__(path)

Wrap an absolute or relative :class:Path.

Parameters:

Name Type Description Default
path Path

The path to wrap. Stored as-is; resolution and existence checks are performed lazily by individual methods.

required

join(rel)

Join rel to the underlying path.

Parameters:

Name Type Description Default
rel str

Relative path to append.

required

Returns:

Type Description
ConfigLocation

A new :class:LocalConfigLocation.

Raises:

Type Description
ValueError

If rel is an absolute path.

exists()

Return whether the path exists on the local filesystem.

read_text()

Read the file as UTF-8 text.

is_relative_to(other)

Return whether this path is contained within other.

Both sides are resolved to absolute paths before comparison so relative segments such as .. are honored. Comparing across location types always returns False.

__str__()

Return the underlying path as a string.

__repr__()

Developer-friendly representation.

__eq__(other)

Two locations are equal when their underlying paths are equal.

__hash__()

Hash by underlying path.

RemoteConfigLocation

Bases: ConfigLocation

A :class:ConfigLocation backed by a Hadoop-accessible URI.

Operations route through the JVM org.apache.hadoop.fs.FileSystem API available on the active :class:SparkSession. The implementation works for any scheme that Spark's Hadoop client can resolve, including abfss://, wasbs://, s3a://, gs://, and file://.

uri property

The underlying URI string.

name property

The final /-delimited segment of the URI.

stem property

The final segment with its extension stripped.

suffix property

The file extension including the leading dot.

parent property

The URI one level up.

Returns the same location when already at the authority root.

__init__(uri, spark)

Construct a remote location.

Parameters:

Name Type Description Default
uri str

A fully qualified URI such as abfss://workspace@onelake.dfs.fabric.microsoft.com/lh/Files/proj.

required
spark SparkSession

Active :class:SparkSession. Used only to access _jvm and _jsc; the session is not stored in any long-lived state outside this object.

required

Raises:

Type Description
ValueError

If uri does not contain ://.

join(rel)

Join rel to this URI, normalizing . and .. segments.

Parameters:

Name Type Description Default
rel str

Relative path to append.

required

Returns:

Type Description
ConfigLocation

A new :class:RemoteConfigLocation.

Raises:

Type Description
ValueError

If rel is absolute, contains its own scheme, or walks above the URI authority via ...

exists()

Return whether the underlying URI exists.

Wraps FileSystem.exists. JVM exceptions are propagated as :class:OSError so callers can handle filesystem errors uniformly.

read_text()

Read the file as UTF-8 text via Hadoop FileSystem.open.

Raises:

Type Description
FileNotFoundError

If the underlying file does not exist.

OSError

For any other I/O failure.

is_relative_to(other)

Return whether this URI lies within other.

Comparison is a normalized prefix match on the URI string. Comparing across location types always returns False.

__str__()

Return the underlying URI.

__repr__()

Developer-friendly representation.

__eq__(other)

Two remote locations are equal when their URIs match.

__hash__()

Hash by URI.

ConfigError

Bases: WeevError

Base exception for configuration-related errors.

__init__(message, cause=None, file_path=None, config_key=None)

Initialize ConfigError.

Parameters:

Name Type Description Default
message str

Human-readable error message

required
cause Exception | None

Optional underlying exception

None
file_path str | None

Path to the config file where the error occurred

None
config_key str | None

Specific config key that caused the error

None

__str__()

Return string representation with context.

DataValidationError

Bases: WeevError

Raised when a fatal-severity validation rule has failing rows.

When any row fails a validation rule declared with severity: fatal, the thread aborts immediately without writing data. Downgrade to severity: error to quarantine failing rows instead of aborting.

ExecutionError

Bases: WeevError

Base exception for execution-time errors.

Carries optional execution context to pinpoint where a failure occurred within a thread pipeline.

__init__(message, cause=None, thread_name=None, step_index=None, step_type=None, source_name=None)

Initialize ExecutionError.

Parameters:

Name Type Description Default
message str

Human-readable error message.

required
cause Exception | None

Optional underlying exception.

None
thread_name str | None

Name of the thread where the error occurred.

None
step_index int | None

Zero-based index of the pipeline step that failed.

None
step_type str | None

Step type key (e.g. "filter", "join") that failed.

None
source_name str | None

Source alias that caused the error.

None

__str__()

Return string representation with execution context.

ModelValidationError

Bases: ConfigError

Raised when a fully resolved config fails to hydrate into a typed model.

This occurs after variable resolution and inheritance, when the concrete values are validated through the Pydantic domain model (Thread, Weave, or Loom). Semantic constraints that span multiple fields are checked here.

VariableResolutionError

Bases: ConfigError

Raised when a ${variable} reference cannot be resolved.

This occurs when a variable placeholder has no matching entry in the parameter context and no default value (${var:-default}) is provided.

LogLevel

Bases: StrEnum

Configurable log level for weevr execution.

Controls the verbosity of structured logging output during pipeline execution. Maps to Python logging levels internally.

Loom

Bases: FrozenBase

A deployment unit containing weave references with optional shared defaults.

Thread

Bases: FrozenBase

Complete domain model for a thread configuration.

A thread is the smallest unit of work: one or more sources, a sequence of transformation steps, and a single target.

ConditionSpec

Bases: FrozenBase

A condition expression for conditional execution.

Attributes:

Name Type Description
when str

Condition expression string. Supports parameter references (${param.x}), built-in checks (table_exists(), table_empty(), row_count()), and simple boolean operators.

Weave

Bases: FrozenBase

A collection of thread references with optional shared defaults.

ExecutionMode

Bases: StrEnum

Execution modes for Context.run().

Attributes:

Name Type Description
EXECUTE

Full execution — read, transform, write (default).

VALIDATE

Config + DAG + source existence checks, no execution.

PLAN

Build and return execution plans without executing.

PREVIEW

Execute with sampled data, no writes.

LoadedConfig

Wrapper around a hydrated config model returned by Context.load().

Provides access to the underlying model (Thread, Weave, or Loom) and a lazily-built execution plan for weave/loom configs. Proxies attribute access to the model for convenience.

Attributes:

Name Type Description
model Any

The hydrated domain model.

config_type str

Config kind ("thread", "weave", "loom").

config_name str

Name derived from the config file path.

model property

The underlying Thread, Weave, or Loom model.

config_type property

Config kind: "thread", "weave", or "loom".

config_name property

Name derived from the config file path.

execution_plan property

Lazily-built execution plans for weave/loom configs.

Returns None for thread configs. For weave configs, returns a single-element list. For loom configs, returns one plan per weave.

__init__(model, config_type, config_name, threads=None, weaves=None)

Initialize with a hydrated model and its metadata.

flow(*, dark=None)

Return a thread flow diagram for thread configs.

Generates an inline SVG showing the thread's processing pipeline: sources, joins, transforms, and target. Returns None for weave/loom configs (use per-thread flow in result reports instead).

Parameters:

Name Type Description Default
dark bool | None

Force dark (True) or light (False) palette. None uses prefers-color-scheme media query (default).

None

Returns:

Name Type Description
A Any

class:~weevr.engine.display.FlowDiagram, or None

Any

for non-thread configs.

__getattr__(name)

Proxy attribute access to the underlying model.

RunResult

Unified result returned by Context.run() for all execution modes.

Provides a consistent interface regardless of config type (thread, weave, loom) or execution mode (execute, validate, plan, preview). Mode-specific fields are None when not applicable.

Attributes:

Name Type Description
status

Aggregate outcome — "success", "failure", or "partial".

mode

The execution mode that produced this result.

config_type

Config kind that was executed ("thread", "weave", "loom").

config_name

Name derived from the config file path.

duration_ms

Wall-clock duration in milliseconds.

detail

Underlying engine result (execute mode only).

telemetry

Structured telemetry data (execute mode only).

execution_plan

Resolved execution plans (plan mode only).

preview_data

Output DataFrames keyed by thread name (preview mode only).

validation_errors

Error messages from validation checks (validate mode only).

warnings list[str]

Non-fatal messages (e.g., zero threads matched a filter).

__init__(*, status, mode, config_type, config_name, duration_ms=0, detail=None, telemetry=None, execution_plan=None, preview_data=None, validation_errors=None, warnings=None)

Initialize a RunResult with execution outcome and telemetry.

dag(*, dark=None)

Return a DAG diagram for plan mode results.

Returns a :class:~weevr.engine.display.DAGDiagram for plan mode, or None for other modes. For multi-weave loom results, returns a loom-level swimlane DAG.

Parameters:

Name Type Description Default
dark bool | None

Force dark (True) or light (False) palette. None uses prefers-color-scheme media query (default).

None

explain()

Return a detailed text breakdown of the execution plan.

Includes dependency provenance, cache targets, lookup schedule, and per-thread source/target/step summary. Sections with no data are omitted. Returns empty string for non-plan modes.

summary()

Return a formatted, human-readable execution summary.

_ResolvedConfig dataclass

Intermediate container for resolved config data.

Context

Entry point for all weevr operations.

Wraps a SparkSession with a project reference, resolved parameters, and execution configuration. Provides run() for execution and load() for model inspection.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession (required).

required
project str | Path

Project identifier. Accepts three forms:

  • Simple name (e.g., "my_project"): resolved via the default lakehouse at /lakehouse/default/Files/<name>.weevr/.
  • Qualified (with workspace and lakehouse): resolved via OneLake ABFS path.
  • Direct path (e.g., "/path/to/project.weevr"): used as-is.
required
params dict[str, Any] | None

Runtime parameter overrides.

None
log_level str

Logging verbosity — "minimal", "standard" (default), "verbose", or "debug".

'standard'
workspace str | None

Fabric workspace ID for cross-lakehouse resolution.

None
lakehouse str | None

Fabric lakehouse ID for cross-lakehouse resolution.

None

spark property

The underlying SparkSession.

params property

Runtime parameter overrides.

log_level property

Configured logging verbosity.

project_root property

Resolved project root as a :class:ConfigLocation.

Returns a :class:LocalConfigLocation when the project is on the local filesystem (Tier 1 default lakehouse or Tier 3 direct path) and a :class:RemoteConfigLocation when the project is qualified by workspace and lakehouse (Tier 2 OneLake).

__init__(spark, project, *, params=None, log_level='standard', workspace=None, lakehouse=None)

Initialize a Context with a SparkSession and project reference.

load(path)

Load and validate a config file, returning a model wrapper.

Parses, resolves, and hydrates the config at path (relative to the project root) without executing anything. The returned :class:LoadedConfig exposes the underlying model and, for weave/loom configs, a lazily-built execution plan.

Parameters:

Name Type Description Default
path str | Path

Path to a config file, relative to the project root.

required

Returns:

Name Type Description
A LoadedConfig

class:LoadedConfig wrapping the hydrated model.

run(path, *, mode='execute', tags=None, threads=None, sample_rows=100)

Run a config file in the specified mode.

Parameters:

Name Type Description Default
path str | Path

Path to a config file, relative to the project root.

required
mode str

Execution mode — "execute" (default), "validate", "plan", or "preview".

'execute'
tags list[str] | None

Run only threads matching any of these tags.

None
threads list[str] | None

Run only threads with these names.

None
sample_rows int

Maximum rows for preview mode (default 100).

100

Returns:

Name Type Description
A RunResult

class:RunResult describing the outcome.

Raises:

Type Description
ValueError

If mode is invalid or both tags and threads are provided.

apply_inheritance(loom_defaults, weave_defaults, thread_config, *, loom_audit_templates=None, weave_audit_templates=None, loom_connections=None, weave_connections=None)

Apply multi-level inheritance cascade.

Cascade order (lowest to highest priority): 1. loom_defaults (lowest) 2. weave_defaults 3. thread_config (highest)

Parameters:

Name Type Description Default
loom_defaults dict[str, Any] | None

Defaults from loom level

required
weave_defaults dict[str, Any] | None

Defaults from weave level

required
thread_config dict[str, Any]

Thread-specific config

required
loom_audit_templates dict[str, Any] | None

User-defined audit template definitions from loom

None
weave_audit_templates dict[str, Any] | None

User-defined audit template definitions from weave

None
loom_connections dict[str, Any] | None

Named connection definitions from loom top-level

None
weave_connections dict[str, Any] | None

Named connection definitions from weave top-level

None

Returns:

Type Description
dict[str, Any]

Fully merged config with thread values taking precedence

expand_foreach(steps)

Expand foreach macro blocks into repeated step sequences.

Each foreach block in the steps list is replaced by its template steps repeated once per value, with {var} placeholders substituted.

Non-foreach entries pass through unchanged.

Parameters:

Name Type Description Default
steps list[dict[str, Any]]

Raw step list (dicts), possibly containing foreach blocks.

required

Returns:

Type Description
list[dict[str, Any]]

Expanded step list with all foreach blocks replaced.

Raises:

Type Description
ConfigError

If a foreach block is missing required fields.

detect_config_type_from_extension(path)

Detect config type from file extension.

Parameters:

Name Type Description Default
path str | Path | ConfigLocation

Path or location of the config file.

required

Returns:

Type Description
str | None

Config type string if the extension is a typed extension

str | None

(.thread, .weave, .loom), or None for

str | None

.yaml/.yml files.

Raises:

Type Description
ConfigError

If the extension is .yaml or .yml and a typed extension was expected (caller context).

extract_config_version(raw)

Extract and parse config_version from a config dict.

Parameters:

Name Type Description Default
raw dict[str, Any]

Parsed config dictionary

required

Returns:

Type Description
tuple[int, int]

Tuple of (major, minor) version numbers

Raises:

Type Description
ConfigParseError

If config_version is missing or invalid format

parse_yaml(path)

Parse a YAML file and return its contents.

Parameters:

Name Type Description Default
path str | Path | ConfigLocation

A local path, a URI string, or a :class:ConfigLocation. Local path inputs are wrapped in a :class:LocalConfigLocation automatically. Remote URIs require the caller to construct a :class:RemoteConfigLocation themselves so they can supply the active SparkSession.

required

Returns:

Type Description
dict[str, Any]

Parsed YAML content as a dictionary.

Raises:

Type Description
ConfigParseError

If the file is not found, unreadable, or contains invalid YAML syntax.

validate_config_version(version, config_type)

Validate that the config version is supported.

Parameters:

Name Type Description Default
version tuple[int, int]

Tuple of (major, minor) version

required
config_type str

Type of config (thread, weave, loom, params)

required

Raises:

Type Description
ConfigVersionError

If the major version doesn't match supported version

build_param_context(runtime_params=None, config_defaults=None, fabric_context=None, entry_params=None)

Build parameter context with proper priority layering.

Priority order (highest to lowest): 1. runtime_params 2. entry_params (nested under param key for ${param.x} access) 3. config_defaults 4. fabric_context

Parameters:

Name Type Description Default
runtime_params dict[str, Any] | None

Runtime parameter overrides.

None
config_defaults dict[str, Any] | None

Default parameters from config.

None
fabric_context dict[str, Any] | None

Fabric environment values keyed as fabric.<field> (e.g. fabric.workspace_id). None values are omitted. Lowest priority — overridden by config_defaults and runtime_params.

None
entry_params dict[str, Any] | None

ThreadEntry-level parameters injected under the param namespace for ${param.x} dotted-key resolution.

None

Returns:

Type Description
dict[str, Any]

Merged parameter context dictionary with dotted key access support.

resolve_declared_params(param_specs, runtime_params, *, file_path=None)

Resolve loom/weave-level declared params: to a flat {name: value} dict.

Precedence per declared param:

  1. Value from runtime_params if supplied
  2. ParamSpec.default if set on the spec
  3. ConfigSchemaError if the param is required
  4. Omitted from the result if optional with no default

The returned dict is intended to bind under the param.* namespace via :func:build_param_context's entry_params argument, so that ${param.x} expressions resolve to the layered value.

Runtime keys not declared in param_specs are ignored — declared scope is the only contract honored at this layer.

Parameters:

Name Type Description Default
param_specs dict[str, Any] | None

Mapping of declared params (ParamSpec instances or their model_dump dicts). None or empty returns {}.

required
runtime_params dict[str, Any] | None

Caller-supplied values keyed by param name.

required
file_path str | None

Optional originating config file path for error messages.

None

Returns:

Type Description
dict[str, Any]

Resolved {name: value} dict ready for entry_params.

Raises:

Type Description
ConfigSchemaError

A required declared param has no runtime value and no spec default. The message includes the file path and param name.

resolve_references(config, config_type, project_root, runtime_params=None, visited=None)

Resolve references to other config files.

Handles both external references (ref key) and inline definitions (name + body keys). Recursively loads referenced configs with circular reference detection.

Parameters:

Name Type Description Default
config dict[str, Any]

Config dict to resolve references in.

required
config_type str

Type of this config ('weave' or 'loom').

required
project_root ConfigLocation | Path

The .weevr project directory. A bare :class:pathlib.Path is wrapped in a :class:LocalConfigLocation for backward compatibility.

required
runtime_params dict[str, Any] | None

Runtime parameters to pass to child configs.

None
visited set[str] | None

Set of already-visited ref strings (for cycle detection).

None

Returns:

Type Description
dict[str, Any]

Config dict with resolved child configs attached under

dict[str, Any]

'_resolved_threads' or '_resolved_weaves' keys.

Raises:

Type Description
ReferenceResolutionError

If referenced file not found or circular reference detected.

ConfigError

If an inline definition is missing a name field.

resolve_variables(config, context, consumed_keys=None)

Recursively resolve variable references in config.

Supports: - ${var} - simple variable reference (error if not found) - ${var:-default} - variable with fallback default

Parameters:

Name Type Description Default
config dict[str, Any] | list[Any] | str | Any

Config structure to resolve (dict, list, str, or primitive)

required
context dict[str, Any]

Parameter context for variable lookup

required
consumed_keys set[str] | None

Optional set to track which context keys were consumed during resolution. When provided, each resolved dotted key is added to the set for post-resolution unused-param analysis.

None

Returns:

Type Description
Any

Config with all variables resolved

Raises:

Type Description
VariableResolutionError

If variable not found and no default provided

validate_params(param_specs, context)

Validate parameters against their type specifications.

Parameters:

Name Type Description Default
param_specs dict[str, Any] | None

Parameter specifications from config

required
context dict[str, Any]

Actual parameter values

required

Raises:

Type Description
ConfigSchemaError

If required params missing or type mismatches

validate_schema(raw, config_type)

Validate a raw config dict against the appropriate pre-resolution schema.

Parameters:

Name Type Description Default
raw dict[str, Any]

Raw config dictionary (variables not yet resolved)

required
config_type str

Type of config (thread, weave, loom, params)

required

Returns:

Type Description
BaseModel

Validated Pydantic model instance

Raises:

Type Description
ConfigSchemaError

If validation fails

is_table_alias(path)

Return True if path looks like a table alias (e.g. schema.table).

Table aliases are dot-separated identifiers resolved by the Spark metastore. File paths contain slashes or URI schemes (://).

execute_thread(spark, thread, collector=None, parent_span_id=None, cached_lookups=None, weave_lookups=None, resolved_params=None, loom_name='', weave_name='', column_sets=None, column_set_defs=None, connections=None)

Execute a single thread from sources through transforms to a Delta target.

Execution order: 1. Initialize thread-level resources (variables, lookups, column_sets). 2. Run thread-level pre_steps. 3. Resolve lookup-based sources from cached or on-demand DataFrames. 4. Read all remaining declared sources into DataFrames. - For incremental_watermark: load prior HWM, apply filter, capture new HWM. - For cdc: read via CDF or generic CDC source. 5. Set the primary (first) source as the working DataFrame. 6. Run pipeline steps against the working DataFrame. 7. Run validation rules (if configured) — quarantine or abort on failures. 8. Apply naming normalization (if configured). 9. Compute business keys and hashes if configured. 10. Resolve the target write path. 11. Apply target column mapping. 12. Inject audit columns (if configured; bypasses mapping mode). 13. Write to the Delta target. - For cdc: use CDC merge routing instead of standard write. 14. Persist watermark state (if applicable). 15. Run post-write assertions (if configured). 16. Write exports (secondary outputs, if configured). 17. Run thread-level post_steps. 18. Build telemetry and return ThreadResult.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
thread Thread

Thread configuration to execute.

required
collector SpanCollector | None

Optional span collector for telemetry. When provided, a thread span is created and finalized.

None
parent_span_id str | None

Optional parent span ID for trace tree linkage.

None
cached_lookups dict[str, Any] | None

Pre-materialized lookup DataFrames keyed by lookup name. Runtime type is dict[str, DataFrame]; Any avoids coupling the signature to PySpark types.

None
weave_lookups dict[str, Lookup] | None

Weave-level lookup definitions for on-demand resolution of non-materialized lookups.

None
resolved_params dict[str, Any] | None

Runtime parameter values for telemetry capture. Stored on ThreadTelemetry for thread-level runs.

None
loom_name str

Loom name for audit column context variables.

''
weave_name str

Weave name for audit column context variables. Derived from the prefix of thread.qualified_key when not provided. Falls back to empty string if no dot separator exists.

''
column_sets dict[str, dict[str, str]] | None

Pre-resolved column set mappings keyed by name. Passed through to run_pipeline so rename steps can look up their resolved dictionaries.

None
column_set_defs dict[str, ColumnSet] | None

Column set model instances keyed by name. Passed through to run_pipeline so rename steps can read on_unmapped and on_extra settings.

None
connections dict[str, OneLakeConnection] | None

Named connection declarations keyed by connection name. When provided, source reads, lookup materialization, target path resolution, and export writes use connection-based paths where configured.

None

Returns:

Type Description
ThreadResult

class:ThreadResult with status, row count, write mode, target path,

ThreadResult

and optional telemetry.

Raises:

Type Description
ExecutionError

If any step fails, with thread_name set on the error.

DataValidationError

If a fatal-severity validation rule fails.

ExportError

If an export with on_failure: abort fails.

build_plan(weave_name, threads, thread_entries, lookups=None)

Build an execution plan from a weave's threads and declared/inferred dependencies.

Performs DAG construction, cycle detection, topological sort, and cache analysis to produce an immutable :class:ExecutionPlan.

When lookups are provided, the planner also:

  • Infers implicit dependencies between threads mediated by lookups (a thread consuming a lookup whose source is produced by another thread depends on that producer).
  • Computes a lookup schedule mapping each execution group boundary to the lookups that should be materialized before that group runs.

Parameters:

Name Type Description Default
weave_name str

Name of the weave being planned.

required
threads dict[str, Thread]

Mapping of thread name to :class:Thread config.

required
thread_entries list[ThreadEntry]

Ordered thread entries from the weave config, which may carry explicit dependency declarations.

required
lookups dict[str, Lookup] | None

Optional weave-level lookup definitions. When provided, lookup- mediated dependencies are inferred and a materialization schedule is computed.

None

Returns:

Type Description
ExecutionPlan

An immutable :class:ExecutionPlan.

Raises:

Type Description
ConfigError

If a circular dependency is detected (with cycle path in the message), or if an explicit dependency references a thread that does not exist in threads.

execute_loom(spark, loom, weaves, threads, params=None)

Execute weaves sequentially in declared order.

Iterates the weaves listed in loom.weaves, builds an :class:~weevr.engine.planner.ExecutionPlan for each, and executes them via :func:execute_weave. Creates a root span collector for telemetry and passes it through the execution tree.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
loom Loom

Loom configuration declaring the ordered list of weave names.

required
weaves dict[str, Weave]

Mapping of weave name to :class:~weevr.model.weave.Weave config.

required
threads dict[str, dict[str, Thread]]

Nested mapping of weave_name → thread_name → Thread config.

required
params dict[str, Any] | None

Parameters for condition evaluation at weave and thread levels.

None

Returns:

Type Description
LoomResult

class:~weevr.engine.result.LoomResult with aggregate status and

LoomResult

per-weave results.

execute_weave(spark, plan, threads, collector=None, parent_span_id=None, thread_conditions=None, params=None, weave_span_label=None, pre_steps=None, post_steps=None, lookups=None, variables=None, loom_name='', weave_name='', column_set_defs=None, pre_cached_lookups=None, connections=None)

Execute threads according to the execution plan.

Processes each parallel group sequentially, submitting threads within a group to a :class:~concurrent.futures.ThreadPoolExecutor for concurrent execution. Respects on_failure config on each thread and manages the cache lifecycle via :class:~weevr.engine.cache_manager.CacheManager.

When hooks are provided, the lifecycle is: 1. Initialize VariableContext from variables. 2. Execute pre_steps. 3. Materialize lookups (materialize=True). 4. Materialize column sets. 5. Execute threads (with cached lookup DataFrames). 6. Execute post_steps. 7. Cleanup lookups.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
plan ExecutionPlan

Immutable :class:~weevr.engine.planner.ExecutionPlan produced by :func:~weevr.engine.planner.build_plan.

required
threads dict[str, Thread]

Mapping of thread name to :class:~weevr.model.thread.Thread config.

required
collector SpanCollector | None

Optional span collector for telemetry. When provided, per-thread collectors are created and merged after execution.

None
parent_span_id str | None

Optional parent span ID for trace tree linkage.

None
thread_conditions dict[str, ConditionSpec] | None

Mapping of thread name to condition spec. Threads with a condition that evaluates to False are skipped.

None
params dict[str, Any] | None

Parameters for condition evaluation.

None
weave_span_label str | None

Label for the weave telemetry span. When set, used instead of plan.weave_name. Typically the weave's qualified key.

None
pre_steps Sequence[HookStep] | None

Optional pre-execution hook steps.

None
post_steps Sequence[HookStep] | None

Optional post-execution hook steps.

None
lookups dict[str, Lookup] | None

Optional weave-level lookup definitions.

None
variables dict[str, VariableSpec] | None

Optional weave-level variable specs.

None
loom_name str

Loom name passed to threads for audit column context.

''
weave_name str

Weave name passed to threads for audit column context.

''
column_set_defs dict[str, ColumnSet] | None

Merged column set definitions (loom-level merged with weave-level, weave wins) to be materialized and forwarded to each thread for rename step resolution.

None
pre_cached_lookups dict[str, Any] | None

Already-materialized lookup DataFrames from a parent scope (e.g. loom level). Seeded into the weave's cached lookup dict so threads can reference them without re-materializing.

None
connections dict[str, OneLakeConnection] | None

Merged loom + weave connections forwarded to materialize_column_sets so connection-backed column sets can resolve their lakehouse target.

None

Returns:

Type Description
WeaveResult

class:~weevr.engine.result.WeaveResult with aggregate status and

WeaveResult

per-thread results.

configure_logging(log_level=LogLevel.STANDARD, logger_name='weevr')

Configure a weevr logger with structured JSON output.

Sets up a logger with a :class:StructuredJsonFormatter handler at the appropriate Python logging level based on the weevr :class:LogLevel.

Parameters:

Name Type Description Default
log_level LogLevel

weevr log level to configure.

STANDARD
logger_name str

Name of the logger to configure.

'weevr'

Returns:

Type Description
Logger

Configured logger instance.


Result Types

weevr.result

Unified result types for the weevr Python API.

ExecutionPlan

Bases: FrozenBase

Immutable execution plan produced by the planner.

Attributes:

Name Type Description
weave_name str

Name of the weave this plan was built for.

threads list[str]

All thread names in the weave.

dependencies dict[str, list[str]]

Maps each thread to the list of threads it depends on.

dependents dict[str, list[str]]

Maps each thread to the list of threads that depend on it.

execution_order list[list[str]]

Parallel execution groups in topological order. Each inner list contains threads that can run concurrently.

cache_targets list[str]

Thread names whose output should be cached because they feed two or more downstream threads.

inferred_dependencies dict[str, list[str]]

The auto-inferred subset of dependencies, determined by matching target paths to source paths across threads.

explicit_dependencies dict[str, list[str]]

The explicitly declared subset of dependencies, sourced from ThreadEntry.dependencies in the weave config.

lookup_schedule dict[int, list[str]] | None

Maps execution group indices to lookups that should be materialized at that boundary. Key 0 means before the first group; key N (N > 0) means after group N-1 finishes. None when build_plan was called without lookups.

lookup_producers dict[str, str | None] | None

Maps lookup names to the thread that produces their source data, or None for external lookups. Only populated when build_plan was called with lookups.

lookup_consumers dict[str, list[str]] | None

Maps lookup names to the list of threads that consume each lookup. Only populated when build_plan was called with lookups.

dag(*, dark=None)

Return an inline SVG DAG diagram of this execution plan.

The returned :class:DAGDiagram auto-renders in notebooks via _repr_svg_() and can be exported via save().

Parameters:

Name Type Description Default
dark bool | None

Force dark (True) or light (False) palette. None uses prefers-color-scheme media query (default).

None

LoomResult

Bases: FrozenBase

Immutable record of a completed loom execution.

Attributes:

Name Type Description
status Literal['success', 'failure', 'partial']

Aggregate outcome — "success" if all weaves succeeded, "failure" if loom stopped after a weave failure, "partial" if some weaves succeeded before a failure.

loom_name str

Name of the loom that was executed.

weave_results list[WeaveResult]

Results for each weave that was executed.

duration_ms int

Wall-clock duration of the loom execution in milliseconds.

telemetry LoomTelemetry | None

Loom-level telemetry aggregated from weave telemetry.

ThreadResult

Bases: FrozenBase

Immutable record of a completed thread execution.

Attributes:

Name Type Description
status Literal['success', 'failure', 'skipped']

Outcome of the execution — "success", "failure", or "skipped" (when a condition evaluated to False).

thread_name str

Name of the thread that was executed.

rows_written int

Number of rows in the DataFrame at write time.

write_mode str

The write mode used ("overwrite", "append", or "merge").

target_path str

Physical path of the Delta table that was written.

telemetry ThreadTelemetry | None

Thread-level telemetry with validation/assertion results and row counts.

skip_reason str | None

The condition expression that caused the thread to be skipped.

error str | None

Error message when the thread failed, None on success or skip.

output_schema list[tuple[str, str]] | None

Column names and Spark data types captured from the output DataFrame before write. Each tuple is (column_name, type_string).

samples dict[str, list[dict[str, Any]]] | None

Data samples captured before write, keyed by category. Contains "output" (up to 10 rows) and optionally "quarantine" (up to 10 rows from the quarantine DataFrame).

drift_report dict[str, Any] | None

Schema drift report captured before write when schema drift detection is active. None when no drift handling is configured.

warp_findings list[dict[str, str]] | None

Warp enforcement findings from this run — each entry names a column and the mismatch reason. None when warp enforcement is not configured for the target.

WeaveResult

Bases: FrozenBase

Immutable record of a completed weave execution.

Attributes:

Name Type Description
status Literal['success', 'failure', 'partial', 'skipped']

Aggregate outcome — "success" if all threads succeeded or were conditionally skipped (no failures), "failure" if all threads failed or were skipped due to upstream failure, "partial" if some threads succeeded and some failed, "skipped" if the weave was conditionally skipped at loom level.

weave_name str

Name of the weave that was executed.

thread_results list[ThreadResult]

Results for each thread that was executed (not skipped).

threads_skipped list[str]

Names of threads that were skipped due to upstream failure.

duration_ms int

Wall-clock duration of the weave execution in milliseconds.

telemetry WeaveTelemetry | None

Weave-level telemetry aggregated from thread telemetry.

skip_reason str | None

The condition expression that caused the weave to be skipped.

LoomTelemetry

Bases: FrozenBase

Telemetry data composed into a LoomResult.

Attributes:

Name Type Description
span ExecutionSpan

The execution span for this loom.

weave_telemetry dict[str, WeaveTelemetry]

Per-weave telemetry keyed by weave name.

resolved_params dict[str, Any] | None

Runtime parameter values that drove this execution. Populated on the outermost telemetry object only (loom-level runs).

ThreadTelemetry

Bases: FrozenBase

Telemetry data composed into a ThreadResult.

Attributes:

Name Type Description
span ExecutionSpan

The execution span for this thread.

validation_results list[ValidationResult]

Results of pre-write validation rules.

assertion_results list[AssertionResult]

Results of post-write assertions.

rows_read int

Total rows read from sources.

rows_written int

Rows written to the target.

rows_quarantined int

Rows written to the quarantine table.

rows_after_transforms int

Row count after transforms, before validation.

load_mode str | None

Load mode used (full, incremental_watermark, etc.).

watermark_column str | None

Watermark column name, if applicable.

watermark_previous_value str | None

Prior HWM value before this run.

watermark_new_value str | None

New HWM value captured during this run.

watermark_persisted bool

Whether watermark state was successfully persisted.

watermark_first_run bool

Whether this was the first run (no prior state).

cdc_inserts int | None

Number of CDC insert operations.

cdc_updates int | None

Number of CDC update operations.

cdc_deletes int | None

Number of CDC delete operations.

resolved_params dict[str, Any] | None

Runtime parameter values that drove this execution. Populated on the outermost telemetry object only (thread-level runs).

audit_columns_applied list[str]

Names of audit columns injected into the output DataFrame for this thread.

export_results list[ExportResult]

Per-export write results, one per export configured on the thread.

WeaveTelemetry

Bases: FrozenBase

Telemetry data composed into a WeaveResult.

Attributes:

Name Type Description
span ExecutionSpan

The execution span for this weave.

thread_telemetry dict[str, ThreadTelemetry]

Per-thread telemetry keyed by thread name.

hook_results list[Any]

Results from pre/post hook step execution.

lookup_results list[Any]

Results from lookup materialization.

column_set_results list[ColumnSetResult]

Results from column set resolution.

variables dict[str, Any]

Final variable values at end of weave execution.

resolved_params dict[str, Any] | None

Runtime parameter values that drove this execution. Populated on the outermost telemetry object only (weave-level runs).

FormattedText

Bases: str

String subclass that displays without quotes in REPLs and notebooks.

Overrides __repr__ so that interactive environments (Python REPL, Jupyter, Fabric notebooks) render the text with real newlines instead of showing escaped \n sequences inside quotes.

Behaves identically to str in all other contexts — comparisons, slicing, formatting, print(), and serialization are unaffected.

__repr__()

Return the string value without quoting or escaping.

ExecutionMode

Bases: StrEnum

Execution modes for Context.run().

Attributes:

Name Type Description
EXECUTE

Full execution — read, transform, write (default).

VALIDATE

Config + DAG + source existence checks, no execution.

PLAN

Build and return execution plans without executing.

PREVIEW

Execute with sampled data, no writes.

RunResult

Unified result returned by Context.run() for all execution modes.

Provides a consistent interface regardless of config type (thread, weave, loom) or execution mode (execute, validate, plan, preview). Mode-specific fields are None when not applicable.

Attributes:

Name Type Description
status

Aggregate outcome — "success", "failure", or "partial".

mode

The execution mode that produced this result.

config_type

Config kind that was executed ("thread", "weave", "loom").

config_name

Name derived from the config file path.

duration_ms

Wall-clock duration in milliseconds.

detail

Underlying engine result (execute mode only).

telemetry

Structured telemetry data (execute mode only).

execution_plan

Resolved execution plans (plan mode only).

preview_data

Output DataFrames keyed by thread name (preview mode only).

validation_errors

Error messages from validation checks (validate mode only).

warnings list[str]

Non-fatal messages (e.g., zero threads matched a filter).

__init__(*, status, mode, config_type, config_name, duration_ms=0, detail=None, telemetry=None, execution_plan=None, preview_data=None, validation_errors=None, warnings=None)

Initialize a RunResult with execution outcome and telemetry.

dag(*, dark=None)

Return a DAG diagram for plan mode results.

Returns a :class:~weevr.engine.display.DAGDiagram for plan mode, or None for other modes. For multi-weave loom results, returns a loom-level swimlane DAG.

Parameters:

Name Type Description Default
dark bool | None

Force dark (True) or light (False) palette. None uses prefers-color-scheme media query (default).

None

explain()

Return a detailed text breakdown of the execution plan.

Includes dependency provenance, cache targets, lookup schedule, and per-thread source/target/step summary. Sections with no data are omitted. Returns empty string for non-plan modes.

summary()

Return a formatted, human-readable execution summary.

LoadedConfig

Wrapper around a hydrated config model returned by Context.load().

Provides access to the underlying model (Thread, Weave, or Loom) and a lazily-built execution plan for weave/loom configs. Proxies attribute access to the model for convenience.

Attributes:

Name Type Description
model Any

The hydrated domain model.

config_type str

Config kind ("thread", "weave", "loom").

config_name str

Name derived from the config file path.

model property

The underlying Thread, Weave, or Loom model.

config_type property

Config kind: "thread", "weave", or "loom".

config_name property

Name derived from the config file path.

execution_plan property

Lazily-built execution plans for weave/loom configs.

Returns None for thread configs. For weave configs, returns a single-element list. For loom configs, returns one plan per weave.

__init__(model, config_type, config_name, threads=None, weaves=None)

Initialize with a hydrated model and its metadata.

flow(*, dark=None)

Return a thread flow diagram for thread configs.

Generates an inline SVG showing the thread's processing pipeline: sources, joins, transforms, and target. Returns None for weave/loom configs (use per-thread flow in result reports instead).

Parameters:

Name Type Description Default
dark bool | None

Force dark (True) or light (False) palette. None uses prefers-color-scheme media query (default).

None

Returns:

Name Type Description
A Any

class:~weevr.engine.display.FlowDiagram, or None

Any

for non-thread configs.

__getattr__(name)

Proxy attribute access to the underlying model.

_format_duration(ms)

Format milliseconds as a human-readable duration string.

Parameters:

Name Type Description Default
ms int | float

Duration in milliseconds.

required

Returns:

Type Description
str

Formatted string like "42ms", "1.2s", or "2m 30.0s".