Engine API¶
The weevr.engine module contains the execution planner and executor. It
resolves the thread dependency DAG, schedules execution order, and runs each
thread through its read-transform-write lifecycle.
weevr.engine
¶
weevr engine — thread, weave, and loom execution orchestration.
__all__ = ['execute_thread', 'ThreadResult', 'ExecutionPlan', 'build_plan', 'execute_weave', 'WeaveResult', 'execute_loom', 'LoomResult', 'CacheManager']
module-attribute
¶
CacheManager
¶
Manages the lifecycle of cached DataFrames within a weave execution.
After a thread that is a cache target completes, the manager persists its output DataFrame so that multiple downstream consumers can read from memory/disk rather than re-scanning the Delta table. When all consumers of a cached thread have finished, the DataFrame is automatically unpersisted.
Cache failures are non-fatal — they degrade performance but never affect correctness (consumers fall back to reading from Delta directly).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cache_targets
|
list[str]
|
Thread names whose outputs should be cached. |
required |
dependents
|
dict[str, list[str]]
|
Maps each thread name to the list of threads that depend on it. |
required |
__init__(cache_targets, dependents)
¶
Initialize with target thread names and their dependency map.
is_cache_target(thread_name)
¶
Return True if the thread's output should be cached.
persist(thread_name, spark, target_path)
¶
Read and persist the thread's target DataFrame.
The DataFrame is read from target_path using the provided SparkSession
and persisted at MEMORY_AND_DISK level. If the operation fails for any
reason, a warning is logged and execution continues without caching.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_name
|
str
|
Name of the thread whose output to cache. |
required |
spark
|
SparkSession
|
Active SparkSession. |
required |
target_path
|
str
|
Delta path to read the DataFrame from. |
required |
notify_complete(consumer_name)
¶
Notify the manager that a consumer thread has completed.
When all consumers of a cached thread have finished, its cached DataFrame is automatically unpersisted. If unpersist fails, a warning is logged and the memory is eventually reclaimed by Spark's GC.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
consumer_name
|
str
|
Name of the thread that just completed. |
required |
cleanup()
¶
Force-unpersist all remaining cached DataFrames.
Called in a finally block to ensure caches are always released,
even when execution fails mid-way.
ExecutionPlan
¶
Bases: FrozenBase
Immutable execution plan produced by the planner.
Attributes:
| Name | Type | Description |
|---|---|---|
weave_name |
str
|
Name of the weave this plan was built for. |
threads |
list[str]
|
All thread names in the weave. |
dependencies |
dict[str, list[str]]
|
Maps each thread to the list of threads it depends on. |
dependents |
dict[str, list[str]]
|
Maps each thread to the list of threads that depend on it. |
execution_order |
list[list[str]]
|
Parallel execution groups in topological order. Each inner list contains threads that can run concurrently. |
cache_targets |
list[str]
|
Thread names whose output should be cached because they feed two or more downstream threads. |
inferred_dependencies |
dict[str, list[str]]
|
The auto-inferred subset of |
explicit_dependencies |
dict[str, list[str]]
|
The explicitly declared subset of |
LoomResult
¶
Bases: FrozenBase
Immutable record of a completed loom execution.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
Literal['success', 'failure', 'partial']
|
Aggregate outcome — |
loom_name |
str
|
Name of the loom that was executed. |
weave_results |
list[WeaveResult]
|
Results for each weave that was executed. |
duration_ms |
int
|
Wall-clock duration of the loom execution in milliseconds. |
telemetry |
LoomTelemetry | None
|
Loom-level telemetry aggregated from weave telemetry. |
ThreadResult
¶
Bases: FrozenBase
Immutable record of a completed thread execution.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
Literal['success', 'failure', 'skipped']
|
Outcome of the execution — |
thread_name |
str
|
Name of the thread that was executed. |
rows_written |
int
|
Number of rows in the DataFrame at write time. |
write_mode |
str
|
The write mode used ( |
target_path |
str
|
Physical path of the Delta table that was written. |
telemetry |
ThreadTelemetry | None
|
Thread-level telemetry with validation/assertion results and row counts. |
skip_reason |
str | None
|
The condition expression that caused the thread to be skipped. |
error |
str | None
|
Error message when the thread failed, |
WeaveResult
¶
Bases: FrozenBase
Immutable record of a completed weave execution.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
Literal['success', 'failure', 'partial', 'skipped']
|
Aggregate outcome — |
weave_name |
str
|
Name of the weave that was executed. |
thread_results |
list[ThreadResult]
|
Results for each thread that was executed (not skipped). |
threads_skipped |
list[str]
|
Names of threads that were skipped due to upstream failure. |
duration_ms |
int
|
Wall-clock duration of the weave execution in milliseconds. |
telemetry |
WeaveTelemetry | None
|
Weave-level telemetry aggregated from thread telemetry. |
skip_reason |
str | None
|
The condition expression that caused the weave to be skipped. |
execute_thread(spark, thread, collector=None, parent_span_id=None, cached_lookups=None, weave_lookups=None)
¶
Execute a single thread from sources through transforms to a Delta target.
Execution order:
1. Resolve lookup-based sources from cached or on-demand DataFrames.
2. Read all remaining declared sources into DataFrames.
- For incremental_watermark: load prior HWM, apply filter, capture new HWM.
- For cdc: read via CDF or generic CDC source.
3. Set the primary (first) source as the working DataFrame.
4. Run pipeline steps against the working DataFrame.
5. Run validation rules (if configured) — quarantine or abort on failures.
6. Compute business keys and hashes if configured.
7. Resolve the target write path.
8. Apply target column mapping.
9. Write to the Delta target.
- For cdc: use CDC merge routing instead of standard write.
10. Persist watermark state (if applicable).
11. Run post-write assertions (if configured).
12. Build telemetry and return ThreadResult.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
thread
|
Thread
|
Thread configuration to execute. |
required |
collector
|
SpanCollector | None
|
Optional span collector for telemetry. When provided, a thread span is created and finalized. |
None
|
parent_span_id
|
str | None
|
Optional parent span ID for trace tree linkage. |
None
|
cached_lookups
|
dict[str, Any] | None
|
Pre-materialized lookup DataFrames keyed by lookup name.
Runtime type is |
None
|
weave_lookups
|
dict[str, Lookup] | None
|
Weave-level lookup definitions for on-demand resolution of non-materialized lookups. |
None
|
Returns:
| Type | Description |
|---|---|
ThreadResult
|
class: |
ThreadResult
|
and optional telemetry. |
Raises:
| Type | Description |
|---|---|
ExecutionError
|
If any step fails, with |
DataValidationError
|
If a fatal-severity validation rule fails. |
build_plan(weave_name, threads, thread_entries)
¶
Build an execution plan from a weave's threads and declared/inferred dependencies.
Performs DAG construction, cycle detection, topological sort, and cache analysis
to produce an immutable :class:ExecutionPlan.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weave_name
|
str
|
Name of the weave being planned. |
required |
threads
|
dict[str, Thread]
|
Mapping of thread name to :class: |
required |
thread_entries
|
list[ThreadEntry]
|
Ordered thread entries from the weave config, which may carry explicit dependency declarations. |
required |
Returns:
| Type | Description |
|---|---|
ExecutionPlan
|
An immutable :class: |
Raises:
| Type | Description |
|---|---|
ConfigError
|
If a circular dependency is detected (with cycle path in
the message), or if an explicit dependency references a thread that
does not exist in |
execute_loom(spark, loom, weaves, threads, params=None)
¶
Execute weaves sequentially in declared order.
Iterates the weaves listed in loom.weaves, builds an
:class:~weevr.engine.planner.ExecutionPlan for each, and executes them
via :func:execute_weave. Creates a root span collector for telemetry
and passes it through the execution tree.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
loom
|
Loom
|
Loom configuration declaring the ordered list of weave names. |
required |
weaves
|
dict[str, Weave]
|
Mapping of weave name to :class: |
required |
threads
|
dict[str, dict[str, Thread]]
|
Nested mapping of |
required |
params
|
dict[str, Any] | None
|
Parameters for condition evaluation at weave and thread levels. |
None
|
Returns:
| Type | Description |
|---|---|
LoomResult
|
class: |
LoomResult
|
per-weave results. |
execute_weave(spark, plan, threads, collector=None, parent_span_id=None, thread_conditions=None, params=None, weave_span_label=None, pre_steps=None, post_steps=None, lookups=None, variables=None)
¶
Execute threads according to the execution plan.
Processes each parallel group sequentially, submitting threads within a
group to a :class:~concurrent.futures.ThreadPoolExecutor for concurrent
execution. Respects on_failure config on each thread and manages the
cache lifecycle via :class:~weevr.engine.cache_manager.CacheManager.
When hooks are provided, the lifecycle is:
1. Initialize VariableContext from variables.
2. Materialize lookups (materialize=True).
3. Execute pre_steps.
4. Execute threads (with cached lookup DataFrames).
5. Execute post_steps.
6. Cleanup lookups.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
plan
|
ExecutionPlan
|
Immutable :class: |
required |
threads
|
dict[str, Thread]
|
Mapping of thread name to :class: |
required |
collector
|
SpanCollector | None
|
Optional span collector for telemetry. When provided, per-thread collectors are created and merged after execution. |
None
|
parent_span_id
|
str | None
|
Optional parent span ID for trace tree linkage. |
None
|
thread_conditions
|
dict[str, ConditionSpec] | None
|
Mapping of thread name to condition spec. Threads with a condition that evaluates to False are skipped. |
None
|
params
|
dict[str, Any] | None
|
Parameters for condition evaluation. |
None
|
weave_span_label
|
str | None
|
Label for the weave telemetry span. When set, used
instead of |
None
|
pre_steps
|
Sequence[HookStep] | None
|
Optional pre-execution hook steps. |
None
|
post_steps
|
Sequence[HookStep] | None
|
Optional post-execution hook steps. |
None
|
lookups
|
dict[str, Lookup] | None
|
Optional weave-level lookup definitions. |
None
|
variables
|
dict[str, VariableSpec] | None
|
Optional weave-level variable specs. |
None
|
Returns:
| Type | Description |
|---|---|
WeaveResult
|
class: |
WeaveResult
|
per-thread results. |