Skip to content

Context API

The Context class is the primary entry point for running weevr pipelines. It manages configuration loading, parameter injection, and orchestrates thread, weave, and loom execution.

weevr.context

Context class — the user-facing entry point for weevr.

logger = logging.getLogger(__name__) module-attribute

ConfigError

Bases: WeevError

Base exception for configuration-related errors.

__init__(message, cause=None, file_path=None, config_key=None)

Initialize ConfigError.

Parameters:

Name Type Description Default
message str

Human-readable error message

required
cause Exception | None

Optional underlying exception

None
file_path str | None

Path to the config file where the error occurred

None
config_key str | None

Specific config key that caused the error

None

__str__()

Return string representation with context.

DataValidationError

Bases: WeevError

Raised when a fatal-severity validation rule has failing rows.

When any row fails a validation rule declared with severity: fatal, the thread aborts immediately without writing data. Downgrade to severity: error to quarantine failing rows instead of aborting.

ExecutionError

Bases: WeevError

Base exception for execution-time errors.

Carries optional execution context to pinpoint where a failure occurred within a thread pipeline.

__init__(message, cause=None, thread_name=None, step_index=None, step_type=None, source_name=None)

Initialize ExecutionError.

Parameters:

Name Type Description Default
message str

Human-readable error message.

required
cause Exception | None

Optional underlying exception.

None
thread_name str | None

Name of the thread where the error occurred.

None
step_index int | None

Zero-based index of the pipeline step that failed.

None
step_type str | None

Step type key (e.g. "filter", "join") that failed.

None
source_name str | None

Source alias that caused the error.

None

__str__()

Return string representation with execution context.

ModelValidationError

Bases: ConfigError

Raised when a fully resolved config fails to hydrate into a typed model.

This occurs after variable resolution and inheritance, when the concrete values are validated through the Pydantic domain model (Thread, Weave, or Loom). Semantic constraints that span multiple fields are checked here.

LogLevel

Bases: StrEnum

Configurable log level for weevr execution.

Controls the verbosity of structured logging output during pipeline execution. Maps to Python logging levels internally.

Loom

Bases: FrozenBase

A deployment unit containing weave references with optional shared defaults.

Thread

Bases: FrozenBase

Complete domain model for a thread configuration.

A thread is the smallest unit of work: one or more sources, a sequence of transformation steps, and a single target.

Weave

Bases: FrozenBase

A collection of thread references with optional shared defaults.

ExecutionMode

Bases: StrEnum

Execution modes for Context.run().

Attributes:

Name Type Description
EXECUTE

Full execution — read, transform, write (default).

VALIDATE

Config + DAG + source existence checks, no execution.

PLAN

Build and return execution plans without executing.

PREVIEW

Execute with sampled data, no writes.

LoadedConfig

Wrapper around a hydrated config model returned by Context.load().

Provides access to the underlying model (Thread, Weave, or Loom) and a lazily-built execution plan for weave/loom configs. Proxies attribute access to the model for convenience.

Attributes:

Name Type Description
model Any

The hydrated domain model.

config_type str

Config kind ("thread", "weave", "loom").

config_name str

Name derived from the config file path.

model property

The underlying Thread, Weave, or Loom model.

config_type property

Config kind: "thread", "weave", or "loom".

config_name property

Name derived from the config file path.

execution_plan property

Lazily-built execution plans for weave/loom configs.

Returns None for thread configs. For weave configs, returns a single-element list. For loom configs, returns one plan per weave.

__init__(model, config_type, config_name, threads=None, weaves=None)

Initialize with a hydrated model and its metadata.

__getattr__(name)

Proxy attribute access to the underlying model.

RunResult

Unified result returned by Context.run() for all execution modes.

Provides a consistent interface regardless of config type (thread, weave, loom) or execution mode (execute, validate, plan, preview). Mode-specific fields are None when not applicable.

Attributes:

Name Type Description
status

Aggregate outcome — "success", "failure", or "partial".

mode

The execution mode that produced this result.

config_type

Config kind that was executed ("thread", "weave", "loom").

config_name

Name derived from the config file path.

duration_ms

Wall-clock duration in milliseconds.

detail

Underlying engine result (execute mode only).

telemetry

Structured telemetry data (execute mode only).

execution_plan

Resolved execution plans (plan mode only).

preview_data

Output DataFrames keyed by thread name (preview mode only).

validation_errors

Error messages from validation checks (validate mode only).

warnings list[str]

Non-fatal messages (e.g., zero threads matched a filter).

__init__(*, status, mode, config_type, config_name, duration_ms=0, detail=None, telemetry=None, execution_plan=None, preview_data=None, validation_errors=None, warnings=None)

Initialize a RunResult with execution outcome and telemetry.

summary()

Return a formatted, human-readable execution summary.

_ResolvedConfig dataclass

Intermediate container for resolved config data.

Context

Entry point for all weevr operations.

Wraps a SparkSession with a project reference, resolved parameters, and execution configuration. Provides run() for execution and load() for model inspection.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession (required).

required
project str | Path

Project identifier. Accepts three forms:

  • Simple name (e.g., "my_project"): resolved via the default lakehouse at /lakehouse/default/Files/<name>.weevr/.
  • Qualified (with workspace and lakehouse): resolved via OneLake ABFS path.
  • Direct path (e.g., "/path/to/project.weevr"): used as-is.
required
params dict[str, Any] | None

Runtime parameter overrides.

None
log_level str

Logging verbosity — "minimal", "standard" (default), "verbose", or "debug".

'standard'
workspace str | None

Fabric workspace ID for cross-lakehouse resolution.

None
lakehouse str | None

Fabric lakehouse ID for cross-lakehouse resolution.

None

spark property

The underlying SparkSession.

params property

Runtime parameter overrides.

log_level property

Configured logging verbosity.

project_root property

Resolved project root path.

__init__(spark, project, *, params=None, log_level='standard', workspace=None, lakehouse=None)

Initialize a Context with a SparkSession and project reference.

load(path)

Load and validate a config file, returning a model wrapper.

Parses, resolves, and hydrates the config at path (relative to the project root) without executing anything. The returned :class:LoadedConfig exposes the underlying model and, for weave/loom configs, a lazily-built execution plan.

Parameters:

Name Type Description Default
path str | Path

Path to a config file, relative to the project root.

required

Returns:

Name Type Description
A LoadedConfig

class:LoadedConfig wrapping the hydrated model.

run(path, *, mode='execute', tags=None, threads=None, sample_rows=100)

Run a config file in the specified mode.

Parameters:

Name Type Description Default
path str | Path

Path to a config file, relative to the project root.

required
mode str

Execution mode — "execute" (default), "validate", "plan", or "preview".

'execute'
tags list[str] | None

Run only threads matching any of these tags.

None
threads list[str] | None

Run only threads with these names.

None
sample_rows int

Maximum rows for preview mode (default 100).

100

Returns:

Name Type Description
A RunResult

class:RunResult describing the outcome.

Raises:

Type Description
ValueError

If mode is invalid or both tags and threads are provided.

apply_inheritance(loom_defaults, weave_defaults, thread_config)

Apply multi-level inheritance cascade.

Cascade order (lowest to highest priority): 1. loom_defaults (lowest) 2. weave_defaults 3. thread_config (highest)

Parameters:

Name Type Description Default
loom_defaults dict[str, Any] | None

Defaults from loom level

required
weave_defaults dict[str, Any] | None

Defaults from weave level

required
thread_config dict[str, Any]

Thread-specific config

required

Returns:

Type Description
dict[str, Any]

Fully merged config with thread values taking precedence

expand_foreach(steps)

Expand foreach macro blocks into repeated step sequences.

Each foreach block in the steps list is replaced by its template steps repeated once per value, with {var} placeholders substituted.

Non-foreach entries pass through unchanged.

Parameters:

Name Type Description Default
steps list[dict[str, Any]]

Raw step list (dicts), possibly containing foreach blocks.

required

Returns:

Type Description
list[dict[str, Any]]

Expanded step list with all foreach blocks replaced.

Raises:

Type Description
ConfigError

If a foreach block is missing required fields.

detect_config_type_from_extension(path)

Detect config type from file extension.

Parameters:

Name Type Description Default
path str | Path

Path to the config file.

required

Returns:

Type Description
str | None

Config type string if the extension is a typed extension

str | None

(.thread, .weave, .loom), or None for

str | None

.yaml/.yml files.

Raises:

Type Description
ConfigError

If the extension is .yaml or .yml and a typed extension was expected (caller context).

extract_config_version(raw)

Extract and parse config_version from a config dict.

Parameters:

Name Type Description Default
raw dict[str, Any]

Parsed config dictionary

required

Returns:

Type Description
tuple[int, int]

Tuple of (major, minor) version numbers

Raises:

Type Description
ConfigParseError

If config_version is missing or invalid format

parse_yaml(path)

Parse a YAML file and return its contents.

Parameters:

Name Type Description Default
path str | Path

Path to the YAML file

required

Returns:

Type Description
dict[str, Any]

Parsed YAML content as a dictionary

Raises:

Type Description
ConfigParseError

If file not found, unreadable, or invalid YAML syntax

validate_config_version(version, config_type)

Validate that the config version is supported.

Parameters:

Name Type Description Default
version tuple[int, int]

Tuple of (major, minor) version

required
config_type str

Type of config (thread, weave, loom, params)

required

Raises:

Type Description
ConfigVersionError

If the major version doesn't match supported version

build_param_context(runtime_params=None, config_defaults=None)

Build parameter context with proper priority layering.

Priority order (highest to lowest): 1. runtime_params 2. config_defaults

Parameters:

Name Type Description Default
runtime_params dict[str, Any] | None

Runtime parameter overrides.

None
config_defaults dict[str, Any] | None

Default parameters from config.

None

Returns:

Type Description
dict[str, Any]

Merged parameter context dictionary with dotted key access support.

resolve_references(config, config_type, project_root, runtime_params=None, visited=None)

Resolve references to other config files.

Handles both external references (ref key) and inline definitions (name + body keys). Recursively loads referenced configs with circular reference detection.

Parameters:

Name Type Description Default
config dict[str, Any]

Config dict to resolve references in.

required
config_type str

Type of this config ('weave' or 'loom').

required
project_root Path

Absolute path to the .weevr project directory.

required
runtime_params dict[str, Any] | None

Runtime parameters to pass to child configs.

None
visited set[str] | None

Set of already-visited ref strings (for cycle detection).

None

Returns:

Type Description
dict[str, Any]

Config dict with resolved child configs attached under

dict[str, Any]

'_resolved_threads' or '_resolved_weaves' keys.

Raises:

Type Description
ReferenceResolutionError

If referenced file not found or circular reference detected.

ConfigError

If an inline definition is missing a name field.

resolve_variables(config, context)

Recursively resolve variable references in config.

Supports: - ${var} - simple variable reference (error if not found) - ${var:-default} - variable with fallback default

Parameters:

Name Type Description Default
config dict[str, Any] | list[Any] | str | Any

Config structure to resolve (dict, list, str, or primitive)

required
context dict[str, Any]

Parameter context for variable lookup

required

Returns:

Type Description
Any

Config with all variables resolved

Raises:

Type Description
VariableResolutionError

If variable not found and no default provided

validate_schema(raw, config_type)

Validate a raw config dict against the appropriate pre-resolution schema.

Parameters:

Name Type Description Default
raw dict[str, Any]

Raw config dictionary (variables not yet resolved)

required
config_type str

Type of config (thread, weave, loom, params)

required

Returns:

Type Description
BaseModel

Validated Pydantic model instance

Raises:

Type Description
ConfigSchemaError

If validation fails

execute_thread(spark, thread, collector=None, parent_span_id=None, cached_lookups=None, weave_lookups=None)

Execute a single thread from sources through transforms to a Delta target.

Execution order: 1. Resolve lookup-based sources from cached or on-demand DataFrames. 2. Read all remaining declared sources into DataFrames. - For incremental_watermark: load prior HWM, apply filter, capture new HWM. - For cdc: read via CDF or generic CDC source. 3. Set the primary (first) source as the working DataFrame. 4. Run pipeline steps against the working DataFrame. 5. Run validation rules (if configured) — quarantine or abort on failures. 6. Compute business keys and hashes if configured. 7. Resolve the target write path. 8. Apply target column mapping. 9. Write to the Delta target. - For cdc: use CDC merge routing instead of standard write. 10. Persist watermark state (if applicable). 11. Run post-write assertions (if configured). 12. Build telemetry and return ThreadResult.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
thread Thread

Thread configuration to execute.

required
collector SpanCollector | None

Optional span collector for telemetry. When provided, a thread span is created and finalized.

None
parent_span_id str | None

Optional parent span ID for trace tree linkage.

None
cached_lookups dict[str, Any] | None

Pre-materialized lookup DataFrames keyed by lookup name. Runtime type is dict[str, DataFrame]; Any avoids coupling the signature to PySpark types.

None
weave_lookups dict[str, Lookup] | None

Weave-level lookup definitions for on-demand resolution of non-materialized lookups.

None

Returns:

Type Description
ThreadResult

class:ThreadResult with status, row count, write mode, target path,

ThreadResult

and optional telemetry.

Raises:

Type Description
ExecutionError

If any step fails, with thread_name set on the error.

DataValidationError

If a fatal-severity validation rule fails.

build_plan(weave_name, threads, thread_entries)

Build an execution plan from a weave's threads and declared/inferred dependencies.

Performs DAG construction, cycle detection, topological sort, and cache analysis to produce an immutable :class:ExecutionPlan.

Parameters:

Name Type Description Default
weave_name str

Name of the weave being planned.

required
threads dict[str, Thread]

Mapping of thread name to :class:Thread config.

required
thread_entries list[ThreadEntry]

Ordered thread entries from the weave config, which may carry explicit dependency declarations.

required

Returns:

Type Description
ExecutionPlan

An immutable :class:ExecutionPlan.

Raises:

Type Description
ConfigError

If a circular dependency is detected (with cycle path in the message), or if an explicit dependency references a thread that does not exist in threads.

execute_loom(spark, loom, weaves, threads, params=None)

Execute weaves sequentially in declared order.

Iterates the weaves listed in loom.weaves, builds an :class:~weevr.engine.planner.ExecutionPlan for each, and executes them via :func:execute_weave. Creates a root span collector for telemetry and passes it through the execution tree.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
loom Loom

Loom configuration declaring the ordered list of weave names.

required
weaves dict[str, Weave]

Mapping of weave name to :class:~weevr.model.weave.Weave config.

required
threads dict[str, dict[str, Thread]]

Nested mapping of weave_name → thread_name → Thread config.

required
params dict[str, Any] | None

Parameters for condition evaluation at weave and thread levels.

None

Returns:

Type Description
LoomResult

class:~weevr.engine.result.LoomResult with aggregate status and

LoomResult

per-weave results.

execute_weave(spark, plan, threads, collector=None, parent_span_id=None, thread_conditions=None, params=None, weave_span_label=None, pre_steps=None, post_steps=None, lookups=None, variables=None)

Execute threads according to the execution plan.

Processes each parallel group sequentially, submitting threads within a group to a :class:~concurrent.futures.ThreadPoolExecutor for concurrent execution. Respects on_failure config on each thread and manages the cache lifecycle via :class:~weevr.engine.cache_manager.CacheManager.

When hooks are provided, the lifecycle is: 1. Initialize VariableContext from variables. 2. Materialize lookups (materialize=True). 3. Execute pre_steps. 4. Execute threads (with cached lookup DataFrames). 5. Execute post_steps. 6. Cleanup lookups.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
plan ExecutionPlan

Immutable :class:~weevr.engine.planner.ExecutionPlan produced by :func:~weevr.engine.planner.build_plan.

required
threads dict[str, Thread]

Mapping of thread name to :class:~weevr.model.thread.Thread config.

required
collector SpanCollector | None

Optional span collector for telemetry. When provided, per-thread collectors are created and merged after execution.

None
parent_span_id str | None

Optional parent span ID for trace tree linkage.

None
thread_conditions dict[str, ConditionSpec] | None

Mapping of thread name to condition spec. Threads with a condition that evaluates to False are skipped.

None
params dict[str, Any] | None

Parameters for condition evaluation.

None
weave_span_label str | None

Label for the weave telemetry span. When set, used instead of plan.weave_name. Typically the weave's qualified key.

None
pre_steps Sequence[HookStep] | None

Optional pre-execution hook steps.

None
post_steps Sequence[HookStep] | None

Optional post-execution hook steps.

None
lookups dict[str, Lookup] | None

Optional weave-level lookup definitions.

None
variables dict[str, VariableSpec] | None

Optional weave-level variable specs.

None

Returns:

Type Description
WeaveResult

class:~weevr.engine.result.WeaveResult with aggregate status and

WeaveResult

per-thread results.

configure_logging(log_level=LogLevel.STANDARD, logger_name='weevr')

Configure a weevr logger with structured JSON output.

Sets up a logger with a :class:StructuredJsonFormatter handler at the appropriate Python logging level based on the weevr :class:LogLevel.

Parameters:

Name Type Description Default
log_level LogLevel

weevr log level to configure.

STANDARD
logger_name str

Name of the logger to configure.

'weevr'

Returns:

Type Description
Logger

Configured logger instance.