Engine API¶
The weevr.engine module contains the execution planner and executor. It
resolves the thread dependency DAG, schedules execution order, and runs each
thread through its read-transform-write lifecycle. The engine also handles
pre/post weave hooks, lookup pre-materialization, conditional thread
execution, and weave-scoped variable binding.
weevr.engine
¶
weevr engine — thread, weave, and loom execution orchestration.
__all__ = ['execute_thread', 'ThreadResult', 'ExecutionPlan', 'build_plan', 'DAGDiagram', 'FlowDiagram', 'execute_weave', 'WeaveResult', 'execute_loom', 'LoomResult', 'CacheManager']
module-attribute
¶
CacheManager
¶
Manages the lifecycle of cached DataFrames within a weave execution.
After a thread that is a cache target completes, the manager persists its output DataFrame so that multiple downstream consumers can read from memory/disk rather than re-scanning the Delta table. When all consumers of a cached thread have finished, the DataFrame is automatically unpersisted.
Cache failures are non-fatal — they degrade performance but never affect correctness (consumers fall back to reading from Delta directly).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cache_targets
|
list[str]
|
Thread names whose outputs should be cached. |
required |
dependents
|
dict[str, list[str]]
|
Maps each thread name to the list of threads that depend on it. |
required |
__init__(cache_targets, dependents)
¶
Initialize with target thread names and their dependency map.
is_cache_target(thread_name)
¶
Return True if the thread's output should be cached.
persist(thread_name, spark, target_path)
¶
Read and persist the thread's target DataFrame.
The DataFrame is read from target_path using the provided SparkSession
and persisted at MEMORY_AND_DISK level. If the operation fails for any
reason, a warning is logged and execution continues without caching.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_name
|
str
|
Name of the thread whose output to cache. |
required |
spark
|
SparkSession
|
Active SparkSession. |
required |
target_path
|
str
|
Delta path to read the DataFrame from. |
required |
notify_complete(consumer_name)
¶
Notify the manager that a consumer thread has completed.
When all consumers of a cached thread have finished, its cached DataFrame is automatically unpersisted. If unpersist fails, a warning is logged and the memory is eventually reclaimed by Spark's GC.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
consumer_name
|
str
|
Name of the thread that just completed. |
required |
cleanup()
¶
Force-unpersist all remaining cached DataFrames.
Called in a finally block to ensure caches are always released,
even when execution fails mid-way.
DAGDiagram
¶
Inline SVG diagram of a weave execution plan.
Auto-renders in notebooks via the _repr_svg_() display protocol.
SVG markup available as string via __str__() or the svg property.
Export to file via save().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
svg
|
str
|
The raw SVG markup string. |
required |
svg
property
¶
The raw SVG markup.
__init__(svg)
¶
Initialize with SVG markup.
__str__()
¶
Return SVG markup as string.
save(path)
¶
Write SVG to a file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
File path to write. Parent directory must exist. |
required |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If the parent directory does not exist. |
FlowDiagram
¶
Inline SVG diagram of a thread's processing pipeline.
Visualizes the flow from sources through transforms to the target.
Auto-renders in notebooks via the _repr_svg_() display protocol.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
svg
|
str
|
The raw SVG markup string. |
required |
svg
property
¶
The raw SVG markup.
__init__(svg)
¶
Initialize with SVG markup.
__str__()
¶
Return SVG markup as string.
save(path)
¶
Write SVG to a file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
File path to write. Parent directory must exist. |
required |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If the parent directory does not exist. |
ExecutionPlan
¶
Bases: FrozenBase
Immutable execution plan produced by the planner.
Attributes:
| Name | Type | Description |
|---|---|---|
weave_name |
str
|
Name of the weave this plan was built for. |
threads |
list[str]
|
All thread names in the weave. |
dependencies |
dict[str, list[str]]
|
Maps each thread to the list of threads it depends on. |
dependents |
dict[str, list[str]]
|
Maps each thread to the list of threads that depend on it. |
execution_order |
list[list[str]]
|
Parallel execution groups in topological order. Each inner list contains threads that can run concurrently. |
cache_targets |
list[str]
|
Thread names whose output should be cached because they feed two or more downstream threads. |
inferred_dependencies |
dict[str, list[str]]
|
The auto-inferred subset of |
explicit_dependencies |
dict[str, list[str]]
|
The explicitly declared subset of |
lookup_schedule |
dict[int, list[str]] | None
|
Maps execution group indices to lookups that should
be materialized at that boundary. Key 0 means before the first
group; key N (N > 0) means after group N-1 finishes. |
lookup_producers |
dict[str, str | None] | None
|
Maps lookup names to the thread that produces
their source data, or |
lookup_consumers |
dict[str, list[str]] | None
|
Maps lookup names to the list of threads that
consume each lookup. Only populated when |
dag(*, dark=None)
¶
Return an inline SVG DAG diagram of this execution plan.
The returned :class:DAGDiagram auto-renders in notebooks
via _repr_svg_() and can be exported via save().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dark
|
bool | None
|
Force dark ( |
None
|
LoomResult
¶
Bases: FrozenBase
Immutable record of a completed loom execution.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
Literal['success', 'failure', 'partial']
|
Aggregate outcome — |
loom_name |
str
|
Name of the loom that was executed. |
weave_results |
list[WeaveResult]
|
Results for each weave that was executed. |
duration_ms |
int
|
Wall-clock duration of the loom execution in milliseconds. |
telemetry |
LoomTelemetry | None
|
Loom-level telemetry aggregated from weave telemetry. |
ThreadResult
¶
Bases: FrozenBase
Immutable record of a completed thread execution.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
Literal['success', 'failure', 'skipped']
|
Outcome of the execution — |
thread_name |
str
|
Name of the thread that was executed. |
rows_written |
int
|
Number of rows in the DataFrame at write time. |
write_mode |
str
|
The write mode used ( |
target_path |
str
|
Physical path of the Delta table that was written. |
telemetry |
ThreadTelemetry | None
|
Thread-level telemetry with validation/assertion results and row counts. |
skip_reason |
str | None
|
The condition expression that caused the thread to be skipped. |
error |
str | None
|
Error message when the thread failed, |
output_schema |
list[tuple[str, str]] | None
|
Column names and Spark data types captured from the output
DataFrame before write. Each tuple is |
samples |
dict[str, list[dict[str, Any]]] | None
|
Data samples captured before write, keyed by category. Contains
|
drift_report |
dict[str, Any] | None
|
Schema drift report captured before write when schema drift
detection is active. |
warp_findings |
list[dict[str, str]] | None
|
Warp enforcement findings from this run — each entry names
a column and the mismatch reason. |
WeaveResult
¶
Bases: FrozenBase
Immutable record of a completed weave execution.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
Literal['success', 'failure', 'partial', 'skipped']
|
Aggregate outcome — |
weave_name |
str
|
Name of the weave that was executed. |
thread_results |
list[ThreadResult]
|
Results for each thread that was executed (not skipped). |
threads_skipped |
list[str]
|
Names of threads that were skipped due to upstream failure. |
duration_ms |
int
|
Wall-clock duration of the weave execution in milliseconds. |
telemetry |
WeaveTelemetry | None
|
Weave-level telemetry aggregated from thread telemetry. |
skip_reason |
str | None
|
The condition expression that caused the weave to be skipped. |
execute_thread(spark, thread, collector=None, parent_span_id=None, cached_lookups=None, weave_lookups=None, resolved_params=None, loom_name='', weave_name='', column_sets=None, column_set_defs=None, connections=None)
¶
Execute a single thread from sources through transforms to a Delta target.
Execution order:
1. Initialize thread-level resources (variables, lookups, column_sets).
2. Run thread-level pre_steps.
3. Resolve lookup-based sources from cached or on-demand DataFrames.
4. Read all remaining declared sources into DataFrames.
- For incremental_watermark: load prior HWM, apply filter, capture new HWM.
- For cdc: read via CDF or generic CDC source.
5. Set the primary (first) source as the working DataFrame.
6. Run pipeline steps against the working DataFrame.
7. Run validation rules (if configured) — quarantine or abort on failures.
8. Apply naming normalization (if configured).
9. Compute business keys and hashes if configured.
10. Resolve the target write path.
11. Apply target column mapping.
12. Inject audit columns (if configured; bypasses mapping mode).
13. Write to the Delta target.
- For cdc: use CDC merge routing instead of standard write.
14. Persist watermark state (if applicable).
15. Run post-write assertions (if configured).
16. Write exports (secondary outputs, if configured).
17. Run thread-level post_steps.
18. Build telemetry and return ThreadResult.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
thread
|
Thread
|
Thread configuration to execute. |
required |
collector
|
SpanCollector | None
|
Optional span collector for telemetry. When provided, a thread span is created and finalized. |
None
|
parent_span_id
|
str | None
|
Optional parent span ID for trace tree linkage. |
None
|
cached_lookups
|
dict[str, Any] | None
|
Pre-materialized lookup DataFrames keyed by lookup name.
Runtime type is |
None
|
weave_lookups
|
dict[str, Lookup] | None
|
Weave-level lookup definitions for on-demand resolution of non-materialized lookups. |
None
|
resolved_params
|
dict[str, Any] | None
|
Runtime parameter values for telemetry capture. Stored on ThreadTelemetry for thread-level runs. |
None
|
loom_name
|
str
|
Loom name for audit column context variables. |
''
|
weave_name
|
str
|
Weave name for audit column context variables.
Derived from the prefix of |
''
|
column_sets
|
dict[str, dict[str, str]] | None
|
Pre-resolved column set mappings keyed by name. Passed
through to |
None
|
column_set_defs
|
dict[str, ColumnSet] | None
|
Column set model instances keyed by name. Passed
through to |
None
|
connections
|
dict[str, OneLakeConnection] | None
|
Named connection declarations keyed by connection name. When provided, source reads, lookup materialization, target path resolution, and export writes use connection-based paths where configured. |
None
|
Returns:
| Type | Description |
|---|---|
ThreadResult
|
class: |
ThreadResult
|
and optional telemetry. |
Raises:
| Type | Description |
|---|---|
ExecutionError
|
If any step fails, with |
DataValidationError
|
If a fatal-severity validation rule fails. |
ExportError
|
If an export with |
build_plan(weave_name, threads, thread_entries, lookups=None)
¶
Build an execution plan from a weave's threads and declared/inferred dependencies.
Performs DAG construction, cycle detection, topological sort, and cache analysis
to produce an immutable :class:ExecutionPlan.
When lookups are provided, the planner also:
- Infers implicit dependencies between threads mediated by lookups (a thread consuming a lookup whose source is produced by another thread depends on that producer).
- Computes a lookup schedule mapping each execution group boundary to the lookups that should be materialized before that group runs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weave_name
|
str
|
Name of the weave being planned. |
required |
threads
|
dict[str, Thread]
|
Mapping of thread name to :class: |
required |
thread_entries
|
list[ThreadEntry]
|
Ordered thread entries from the weave config, which may carry explicit dependency declarations. |
required |
lookups
|
dict[str, Lookup] | None
|
Optional weave-level lookup definitions. When provided, lookup- mediated dependencies are inferred and a materialization schedule is computed. |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionPlan
|
An immutable :class: |
Raises:
| Type | Description |
|---|---|
ConfigError
|
If a circular dependency is detected (with cycle path in
the message), or if an explicit dependency references a thread that
does not exist in |
execute_loom(spark, loom, weaves, threads, params=None)
¶
Execute weaves sequentially in declared order.
Iterates the weaves listed in loom.weaves, builds an
:class:~weevr.engine.planner.ExecutionPlan for each, and executes them
via :func:execute_weave. Creates a root span collector for telemetry
and passes it through the execution tree.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
loom
|
Loom
|
Loom configuration declaring the ordered list of weave names. |
required |
weaves
|
dict[str, Weave]
|
Mapping of weave name to :class: |
required |
threads
|
dict[str, dict[str, Thread]]
|
Nested mapping of |
required |
params
|
dict[str, Any] | None
|
Parameters for condition evaluation at weave and thread levels. |
None
|
Returns:
| Type | Description |
|---|---|
LoomResult
|
class: |
LoomResult
|
per-weave results. |
execute_weave(spark, plan, threads, collector=None, parent_span_id=None, thread_conditions=None, params=None, weave_span_label=None, pre_steps=None, post_steps=None, lookups=None, variables=None, loom_name='', weave_name='', column_set_defs=None, pre_cached_lookups=None, connections=None)
¶
Execute threads according to the execution plan.
Processes each parallel group sequentially, submitting threads within a
group to a :class:~concurrent.futures.ThreadPoolExecutor for concurrent
execution. Respects on_failure config on each thread and manages the
cache lifecycle via :class:~weevr.engine.cache_manager.CacheManager.
When hooks are provided, the lifecycle is:
1. Initialize VariableContext from variables.
2. Execute pre_steps.
3. Materialize lookups (materialize=True).
4. Materialize column sets.
5. Execute threads (with cached lookup DataFrames).
6. Execute post_steps.
7. Cleanup lookups.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
plan
|
ExecutionPlan
|
Immutable :class: |
required |
threads
|
dict[str, Thread]
|
Mapping of thread name to :class: |
required |
collector
|
SpanCollector | None
|
Optional span collector for telemetry. When provided, per-thread collectors are created and merged after execution. |
None
|
parent_span_id
|
str | None
|
Optional parent span ID for trace tree linkage. |
None
|
thread_conditions
|
dict[str, ConditionSpec] | None
|
Mapping of thread name to condition spec. Threads with a condition that evaluates to False are skipped. |
None
|
params
|
dict[str, Any] | None
|
Parameters for condition evaluation. |
None
|
weave_span_label
|
str | None
|
Label for the weave telemetry span. When set, used
instead of |
None
|
pre_steps
|
Sequence[HookStep] | None
|
Optional pre-execution hook steps. |
None
|
post_steps
|
Sequence[HookStep] | None
|
Optional post-execution hook steps. |
None
|
lookups
|
dict[str, Lookup] | None
|
Optional weave-level lookup definitions. |
None
|
variables
|
dict[str, VariableSpec] | None
|
Optional weave-level variable specs. |
None
|
loom_name
|
str
|
Loom name passed to threads for audit column context. |
''
|
weave_name
|
str
|
Weave name passed to threads for audit column context. |
''
|
column_set_defs
|
dict[str, ColumnSet] | None
|
Merged column set definitions (loom-level merged with weave-level, weave wins) to be materialized and forwarded to each thread for rename step resolution. |
None
|
pre_cached_lookups
|
dict[str, Any] | None
|
Already-materialized lookup DataFrames from a parent scope (e.g. loom level). Seeded into the weave's cached lookup dict so threads can reference them without re-materializing. |
None
|
connections
|
dict[str, OneLakeConnection] | None
|
Merged loom + weave connections forwarded to
|
None
|
Returns:
| Type | Description |
|---|---|
WeaveResult
|
class: |
WeaveResult
|
per-thread results. |