Context API¶
The Context class is the primary entry point for running weevr pipelines.
It manages configuration loading, parameter injection, and orchestrates
thread, weave, and loom execution.
weevr.context
¶
Context class — the user-facing entry point for weevr.
logger = logging.getLogger(__name__)
module-attribute
¶
ConfigError
¶
Bases: WeevError
Base exception for configuration-related errors.
__init__(message, cause=None, file_path=None, config_key=None)
¶
Initialize ConfigError.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str
|
Human-readable error message |
required |
cause
|
Exception | None
|
Optional underlying exception |
None
|
file_path
|
str | None
|
Path to the config file where the error occurred |
None
|
config_key
|
str | None
|
Specific config key that caused the error |
None
|
__str__()
¶
Return string representation with context.
DataValidationError
¶
Bases: WeevError
Raised when a fatal-severity validation rule has failing rows.
When any row fails a validation rule declared with severity: fatal,
the thread aborts immediately without writing data. Downgrade to
severity: error to quarantine failing rows instead of aborting.
ExecutionError
¶
Bases: WeevError
Base exception for execution-time errors.
Carries optional execution context to pinpoint where a failure occurred within a thread pipeline.
__init__(message, cause=None, thread_name=None, step_index=None, step_type=None, source_name=None)
¶
Initialize ExecutionError.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str
|
Human-readable error message. |
required |
cause
|
Exception | None
|
Optional underlying exception. |
None
|
thread_name
|
str | None
|
Name of the thread where the error occurred. |
None
|
step_index
|
int | None
|
Zero-based index of the pipeline step that failed. |
None
|
step_type
|
str | None
|
Step type key (e.g. "filter", "join") that failed. |
None
|
source_name
|
str | None
|
Source alias that caused the error. |
None
|
__str__()
¶
Return string representation with execution context.
ModelValidationError
¶
Bases: ConfigError
Raised when a fully resolved config fails to hydrate into a typed model.
This occurs after variable resolution and inheritance, when the concrete values are validated through the Pydantic domain model (Thread, Weave, or Loom). Semantic constraints that span multiple fields are checked here.
LogLevel
¶
Bases: StrEnum
Configurable log level for weevr execution.
Controls the verbosity of structured logging output during pipeline execution. Maps to Python logging levels internally.
Loom
¶
Bases: FrozenBase
A deployment unit containing weave references with optional shared defaults.
Thread
¶
Bases: FrozenBase
Complete domain model for a thread configuration.
A thread is the smallest unit of work: one or more sources, a sequence of transformation steps, and a single target.
Weave
¶
Bases: FrozenBase
A collection of thread references with optional shared defaults.
ExecutionMode
¶
Bases: StrEnum
Execution modes for Context.run().
Attributes:
| Name | Type | Description |
|---|---|---|
EXECUTE |
Full execution — read, transform, write (default). |
|
VALIDATE |
Config + DAG + source existence checks, no execution. |
|
PLAN |
Build and return execution plans without executing. |
|
PREVIEW |
Execute with sampled data, no writes. |
LoadedConfig
¶
Wrapper around a hydrated config model returned by Context.load().
Provides access to the underlying model (Thread, Weave, or Loom) and a lazily-built execution plan for weave/loom configs. Proxies attribute access to the model for convenience.
Attributes:
| Name | Type | Description |
|---|---|---|
model |
Any
|
The hydrated domain model. |
config_type |
str
|
Config kind ( |
config_name |
str
|
Name derived from the config file path. |
model
property
¶
The underlying Thread, Weave, or Loom model.
config_type
property
¶
Config kind: "thread", "weave", or "loom".
config_name
property
¶
Name derived from the config file path.
execution_plan
property
¶
Lazily-built execution plans for weave/loom configs.
Returns None for thread configs. For weave configs, returns a
single-element list. For loom configs, returns one plan per weave.
__init__(model, config_type, config_name, threads=None, weaves=None)
¶
Initialize with a hydrated model and its metadata.
__getattr__(name)
¶
Proxy attribute access to the underlying model.
RunResult
¶
Unified result returned by Context.run() for all execution modes.
Provides a consistent interface regardless of config type (thread, weave,
loom) or execution mode (execute, validate, plan, preview). Mode-specific
fields are None when not applicable.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
Aggregate outcome — |
|
mode |
The execution mode that produced this result. |
|
config_type |
Config kind that was executed ( |
|
config_name |
Name derived from the config file path. |
|
duration_ms |
Wall-clock duration in milliseconds. |
|
detail |
Underlying engine result (execute mode only). |
|
telemetry |
Structured telemetry data (execute mode only). |
|
execution_plan |
Resolved execution plans (plan mode only). |
|
preview_data |
Output DataFrames keyed by thread name (preview mode only). |
|
validation_errors |
Error messages from validation checks (validate mode only). |
|
warnings |
list[str]
|
Non-fatal messages (e.g., zero threads matched a filter). |
__init__(*, status, mode, config_type, config_name, duration_ms=0, detail=None, telemetry=None, execution_plan=None, preview_data=None, validation_errors=None, warnings=None)
¶
Initialize a RunResult with execution outcome and telemetry.
summary()
¶
Return a formatted, human-readable execution summary.
_ResolvedConfig
dataclass
¶
Intermediate container for resolved config data.
Context
¶
Entry point for all weevr operations.
Wraps a SparkSession with a project reference, resolved parameters,
and execution configuration. Provides run() for execution and
load() for model inspection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession (required). |
required |
project
|
str | Path
|
Project identifier. Accepts three forms:
|
required |
params
|
dict[str, Any] | None
|
Runtime parameter overrides. |
None
|
log_level
|
str
|
Logging verbosity — |
'standard'
|
workspace
|
str | None
|
Fabric workspace ID for cross-lakehouse resolution. |
None
|
lakehouse
|
str | None
|
Fabric lakehouse ID for cross-lakehouse resolution. |
None
|
spark
property
¶
The underlying SparkSession.
params
property
¶
Runtime parameter overrides.
log_level
property
¶
Configured logging verbosity.
project_root
property
¶
Resolved project root path.
__init__(spark, project, *, params=None, log_level='standard', workspace=None, lakehouse=None)
¶
Initialize a Context with a SparkSession and project reference.
load(path)
¶
Load and validate a config file, returning a model wrapper.
Parses, resolves, and hydrates the config at path (relative to
the project root) without executing anything. The returned
:class:LoadedConfig exposes the underlying model and, for
weave/loom configs, a lazily-built execution plan.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path to a config file, relative to the project root. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
A |
LoadedConfig
|
class: |
run(path, *, mode='execute', tags=None, threads=None, sample_rows=100)
¶
Run a config file in the specified mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path to a config file, relative to the project root. |
required |
mode
|
str
|
Execution mode — |
'execute'
|
tags
|
list[str] | None
|
Run only threads matching any of these tags. |
None
|
threads
|
list[str] | None
|
Run only threads with these names. |
None
|
sample_rows
|
int
|
Maximum rows for preview mode (default 100). |
100
|
Returns:
| Name | Type | Description |
|---|---|---|
A |
RunResult
|
class: |
Raises:
| Type | Description |
|---|---|
ValueError
|
If mode is invalid or both tags and threads are provided. |
apply_inheritance(loom_defaults, weave_defaults, thread_config)
¶
Apply multi-level inheritance cascade.
Cascade order (lowest to highest priority): 1. loom_defaults (lowest) 2. weave_defaults 3. thread_config (highest)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
loom_defaults
|
dict[str, Any] | None
|
Defaults from loom level |
required |
weave_defaults
|
dict[str, Any] | None
|
Defaults from weave level |
required |
thread_config
|
dict[str, Any]
|
Thread-specific config |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Fully merged config with thread values taking precedence |
expand_foreach(steps)
¶
Expand foreach macro blocks into repeated step sequences.
Each foreach block in the steps list is replaced by its template steps
repeated once per value, with {var} placeholders substituted.
Non-foreach entries pass through unchanged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
steps
|
list[dict[str, Any]]
|
Raw step list (dicts), possibly containing foreach blocks. |
required |
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
Expanded step list with all foreach blocks replaced. |
Raises:
| Type | Description |
|---|---|
ConfigError
|
If a foreach block is missing required fields. |
detect_config_type_from_extension(path)
¶
Detect config type from file extension.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path to the config file. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Config type string if the extension is a typed extension |
str | None
|
( |
str | None
|
|
Raises:
| Type | Description |
|---|---|
ConfigError
|
If the extension is |
extract_config_version(raw)
¶
Extract and parse config_version from a config dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw
|
dict[str, Any]
|
Parsed config dictionary |
required |
Returns:
| Type | Description |
|---|---|
tuple[int, int]
|
Tuple of (major, minor) version numbers |
Raises:
| Type | Description |
|---|---|
ConfigParseError
|
If config_version is missing or invalid format |
parse_yaml(path)
¶
Parse a YAML file and return its contents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path to the YAML file |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Parsed YAML content as a dictionary |
Raises:
| Type | Description |
|---|---|
ConfigParseError
|
If file not found, unreadable, or invalid YAML syntax |
validate_config_version(version, config_type)
¶
Validate that the config version is supported.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
version
|
tuple[int, int]
|
Tuple of (major, minor) version |
required |
config_type
|
str
|
Type of config (thread, weave, loom, params) |
required |
Raises:
| Type | Description |
|---|---|
ConfigVersionError
|
If the major version doesn't match supported version |
build_param_context(runtime_params=None, config_defaults=None)
¶
Build parameter context with proper priority layering.
Priority order (highest to lowest): 1. runtime_params 2. config_defaults
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
runtime_params
|
dict[str, Any] | None
|
Runtime parameter overrides. |
None
|
config_defaults
|
dict[str, Any] | None
|
Default parameters from config. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Merged parameter context dictionary with dotted key access support. |
resolve_references(config, config_type, project_root, runtime_params=None, visited=None)
¶
Resolve references to other config files.
Handles both external references (ref key) and inline definitions
(name + body keys). Recursively loads referenced configs with
circular reference detection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Config dict to resolve references in. |
required |
config_type
|
str
|
Type of this config ( |
required |
project_root
|
Path
|
Absolute path to the |
required |
runtime_params
|
dict[str, Any] | None
|
Runtime parameters to pass to child configs. |
None
|
visited
|
set[str] | None
|
Set of already-visited ref strings (for cycle detection). |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Config dict with resolved child configs attached under |
dict[str, Any]
|
|
Raises:
| Type | Description |
|---|---|
ReferenceResolutionError
|
If referenced file not found or circular reference detected. |
ConfigError
|
If an inline definition is missing a |
resolve_variables(config, context)
¶
Recursively resolve variable references in config.
Supports: - ${var} - simple variable reference (error if not found) - ${var:-default} - variable with fallback default
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any] | list[Any] | str | Any
|
Config structure to resolve (dict, list, str, or primitive) |
required |
context
|
dict[str, Any]
|
Parameter context for variable lookup |
required |
Returns:
| Type | Description |
|---|---|
Any
|
Config with all variables resolved |
Raises:
| Type | Description |
|---|---|
VariableResolutionError
|
If variable not found and no default provided |
validate_schema(raw, config_type)
¶
Validate a raw config dict against the appropriate pre-resolution schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw
|
dict[str, Any]
|
Raw config dictionary (variables not yet resolved) |
required |
config_type
|
str
|
Type of config (thread, weave, loom, params) |
required |
Returns:
| Type | Description |
|---|---|
BaseModel
|
Validated Pydantic model instance |
Raises:
| Type | Description |
|---|---|
ConfigSchemaError
|
If validation fails |
execute_thread(spark, thread, collector=None, parent_span_id=None, cached_lookups=None, weave_lookups=None)
¶
Execute a single thread from sources through transforms to a Delta target.
Execution order:
1. Resolve lookup-based sources from cached or on-demand DataFrames.
2. Read all remaining declared sources into DataFrames.
- For incremental_watermark: load prior HWM, apply filter, capture new HWM.
- For cdc: read via CDF or generic CDC source.
3. Set the primary (first) source as the working DataFrame.
4. Run pipeline steps against the working DataFrame.
5. Run validation rules (if configured) — quarantine or abort on failures.
6. Compute business keys and hashes if configured.
7. Resolve the target write path.
8. Apply target column mapping.
9. Write to the Delta target.
- For cdc: use CDC merge routing instead of standard write.
10. Persist watermark state (if applicable).
11. Run post-write assertions (if configured).
12. Build telemetry and return ThreadResult.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
thread
|
Thread
|
Thread configuration to execute. |
required |
collector
|
SpanCollector | None
|
Optional span collector for telemetry. When provided, a thread span is created and finalized. |
None
|
parent_span_id
|
str | None
|
Optional parent span ID for trace tree linkage. |
None
|
cached_lookups
|
dict[str, Any] | None
|
Pre-materialized lookup DataFrames keyed by lookup name.
Runtime type is |
None
|
weave_lookups
|
dict[str, Lookup] | None
|
Weave-level lookup definitions for on-demand resolution of non-materialized lookups. |
None
|
Returns:
| Type | Description |
|---|---|
ThreadResult
|
class: |
ThreadResult
|
and optional telemetry. |
Raises:
| Type | Description |
|---|---|
ExecutionError
|
If any step fails, with |
DataValidationError
|
If a fatal-severity validation rule fails. |
build_plan(weave_name, threads, thread_entries)
¶
Build an execution plan from a weave's threads and declared/inferred dependencies.
Performs DAG construction, cycle detection, topological sort, and cache analysis
to produce an immutable :class:ExecutionPlan.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weave_name
|
str
|
Name of the weave being planned. |
required |
threads
|
dict[str, Thread]
|
Mapping of thread name to :class: |
required |
thread_entries
|
list[ThreadEntry]
|
Ordered thread entries from the weave config, which may carry explicit dependency declarations. |
required |
Returns:
| Type | Description |
|---|---|
ExecutionPlan
|
An immutable :class: |
Raises:
| Type | Description |
|---|---|
ConfigError
|
If a circular dependency is detected (with cycle path in
the message), or if an explicit dependency references a thread that
does not exist in |
execute_loom(spark, loom, weaves, threads, params=None)
¶
Execute weaves sequentially in declared order.
Iterates the weaves listed in loom.weaves, builds an
:class:~weevr.engine.planner.ExecutionPlan for each, and executes them
via :func:execute_weave. Creates a root span collector for telemetry
and passes it through the execution tree.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
loom
|
Loom
|
Loom configuration declaring the ordered list of weave names. |
required |
weaves
|
dict[str, Weave]
|
Mapping of weave name to :class: |
required |
threads
|
dict[str, dict[str, Thread]]
|
Nested mapping of |
required |
params
|
dict[str, Any] | None
|
Parameters for condition evaluation at weave and thread levels. |
None
|
Returns:
| Type | Description |
|---|---|
LoomResult
|
class: |
LoomResult
|
per-weave results. |
execute_weave(spark, plan, threads, collector=None, parent_span_id=None, thread_conditions=None, params=None, weave_span_label=None, pre_steps=None, post_steps=None, lookups=None, variables=None)
¶
Execute threads according to the execution plan.
Processes each parallel group sequentially, submitting threads within a
group to a :class:~concurrent.futures.ThreadPoolExecutor for concurrent
execution. Respects on_failure config on each thread and manages the
cache lifecycle via :class:~weevr.engine.cache_manager.CacheManager.
When hooks are provided, the lifecycle is:
1. Initialize VariableContext from variables.
2. Materialize lookups (materialize=True).
3. Execute pre_steps.
4. Execute threads (with cached lookup DataFrames).
5. Execute post_steps.
6. Cleanup lookups.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
plan
|
ExecutionPlan
|
Immutable :class: |
required |
threads
|
dict[str, Thread]
|
Mapping of thread name to :class: |
required |
collector
|
SpanCollector | None
|
Optional span collector for telemetry. When provided, per-thread collectors are created and merged after execution. |
None
|
parent_span_id
|
str | None
|
Optional parent span ID for trace tree linkage. |
None
|
thread_conditions
|
dict[str, ConditionSpec] | None
|
Mapping of thread name to condition spec. Threads with a condition that evaluates to False are skipped. |
None
|
params
|
dict[str, Any] | None
|
Parameters for condition evaluation. |
None
|
weave_span_label
|
str | None
|
Label for the weave telemetry span. When set, used
instead of |
None
|
pre_steps
|
Sequence[HookStep] | None
|
Optional pre-execution hook steps. |
None
|
post_steps
|
Sequence[HookStep] | None
|
Optional post-execution hook steps. |
None
|
lookups
|
dict[str, Lookup] | None
|
Optional weave-level lookup definitions. |
None
|
variables
|
dict[str, VariableSpec] | None
|
Optional weave-level variable specs. |
None
|
Returns:
| Type | Description |
|---|---|
WeaveResult
|
class: |
WeaveResult
|
per-thread results. |
configure_logging(log_level=LogLevel.STANDARD, logger_name='weevr')
¶
Configure a weevr logger with structured JSON output.
Sets up a logger with a :class:StructuredJsonFormatter handler at the
appropriate Python logging level based on the weevr :class:LogLevel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
log_level
|
LogLevel
|
weevr log level to configure. |
STANDARD
|
logger_name
|
str
|
Name of the logger to configure. |
'weevr'
|
Returns:
| Type | Description |
|---|---|
Logger
|
Configured logger instance. |