Skip to content

Engine API

The weevr.engine module contains the execution planner and executor. It resolves the thread dependency DAG, schedules execution order, and runs each thread through its read-transform-write lifecycle.

weevr.engine

weevr engine — thread, weave, and loom execution orchestration.

__all__ = ['execute_thread', 'ThreadResult', 'ExecutionPlan', 'build_plan', 'execute_weave', 'WeaveResult', 'execute_loom', 'LoomResult', 'CacheManager'] module-attribute

CacheManager

Manages the lifecycle of cached DataFrames within a weave execution.

After a thread that is a cache target completes, the manager persists its output DataFrame so that multiple downstream consumers can read from memory/disk rather than re-scanning the Delta table. When all consumers of a cached thread have finished, the DataFrame is automatically unpersisted.

Cache failures are non-fatal — they degrade performance but never affect correctness (consumers fall back to reading from Delta directly).

Parameters:

Name Type Description Default
cache_targets list[str]

Thread names whose outputs should be cached.

required
dependents dict[str, list[str]]

Maps each thread name to the list of threads that depend on it.

required

__init__(cache_targets, dependents)

Initialize with target thread names and their dependency map.

is_cache_target(thread_name)

Return True if the thread's output should be cached.

persist(thread_name, spark, target_path)

Read and persist the thread's target DataFrame.

The DataFrame is read from target_path using the provided SparkSession and persisted at MEMORY_AND_DISK level. If the operation fails for any reason, a warning is logged and execution continues without caching.

Parameters:

Name Type Description Default
thread_name str

Name of the thread whose output to cache.

required
spark SparkSession

Active SparkSession.

required
target_path str

Delta path to read the DataFrame from.

required

notify_complete(consumer_name)

Notify the manager that a consumer thread has completed.

When all consumers of a cached thread have finished, its cached DataFrame is automatically unpersisted. If unpersist fails, a warning is logged and the memory is eventually reclaimed by Spark's GC.

Parameters:

Name Type Description Default
consumer_name str

Name of the thread that just completed.

required

cleanup()

Force-unpersist all remaining cached DataFrames.

Called in a finally block to ensure caches are always released, even when execution fails mid-way.

ExecutionPlan

Bases: FrozenBase

Immutable execution plan produced by the planner.

Attributes:

Name Type Description
weave_name str

Name of the weave this plan was built for.

threads list[str]

All thread names in the weave.

dependencies dict[str, list[str]]

Maps each thread to the list of threads it depends on.

dependents dict[str, list[str]]

Maps each thread to the list of threads that depend on it.

execution_order list[list[str]]

Parallel execution groups in topological order. Each inner list contains threads that can run concurrently.

cache_targets list[str]

Thread names whose output should be cached because they feed two or more downstream threads.

inferred_dependencies dict[str, list[str]]

The auto-inferred subset of dependencies, determined by matching target paths to source paths across threads.

explicit_dependencies dict[str, list[str]]

The explicitly declared subset of dependencies, sourced from ThreadEntry.dependencies in the weave config.

LoomResult

Bases: FrozenBase

Immutable record of a completed loom execution.

Attributes:

Name Type Description
status Literal['success', 'failure', 'partial']

Aggregate outcome — "success" if all weaves succeeded, "failure" if loom stopped after a weave failure, "partial" if some weaves succeeded before a failure.

loom_name str

Name of the loom that was executed.

weave_results list[WeaveResult]

Results for each weave that was executed.

duration_ms int

Wall-clock duration of the loom execution in milliseconds.

telemetry LoomTelemetry | None

Loom-level telemetry aggregated from weave telemetry.

ThreadResult

Bases: FrozenBase

Immutable record of a completed thread execution.

Attributes:

Name Type Description
status Literal['success', 'failure', 'skipped']

Outcome of the execution — "success", "failure", or "skipped" (when a condition evaluated to False).

thread_name str

Name of the thread that was executed.

rows_written int

Number of rows in the DataFrame at write time.

write_mode str

The write mode used ("overwrite", "append", or "merge").

target_path str

Physical path of the Delta table that was written.

telemetry ThreadTelemetry | None

Thread-level telemetry with validation/assertion results and row counts.

skip_reason str | None

The condition expression that caused the thread to be skipped.

error str | None

Error message when the thread failed, None on success or skip.

WeaveResult

Bases: FrozenBase

Immutable record of a completed weave execution.

Attributes:

Name Type Description
status Literal['success', 'failure', 'partial', 'skipped']

Aggregate outcome — "success" if all threads succeeded, "failure" if all threads failed or were skipped, "partial" if some succeeded and some failed or were skipped, "skipped" if the weave was conditionally skipped at loom level.

weave_name str

Name of the weave that was executed.

thread_results list[ThreadResult]

Results for each thread that was executed (not skipped).

threads_skipped list[str]

Names of threads that were skipped due to upstream failure.

duration_ms int

Wall-clock duration of the weave execution in milliseconds.

telemetry WeaveTelemetry | None

Weave-level telemetry aggregated from thread telemetry.

skip_reason str | None

The condition expression that caused the weave to be skipped.

execute_thread(spark, thread, collector=None, parent_span_id=None, cached_lookups=None, weave_lookups=None)

Execute a single thread from sources through transforms to a Delta target.

Execution order: 1. Resolve lookup-based sources from cached or on-demand DataFrames. 2. Read all remaining declared sources into DataFrames. - For incremental_watermark: load prior HWM, apply filter, capture new HWM. - For cdc: read via CDF or generic CDC source. 3. Set the primary (first) source as the working DataFrame. 4. Run pipeline steps against the working DataFrame. 5. Run validation rules (if configured) — quarantine or abort on failures. 6. Compute business keys and hashes if configured. 7. Resolve the target write path. 8. Apply target column mapping. 9. Write to the Delta target. - For cdc: use CDC merge routing instead of standard write. 10. Persist watermark state (if applicable). 11. Run post-write assertions (if configured). 12. Build telemetry and return ThreadResult.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
thread Thread

Thread configuration to execute.

required
collector SpanCollector | None

Optional span collector for telemetry. When provided, a thread span is created and finalized.

None
parent_span_id str | None

Optional parent span ID for trace tree linkage.

None
cached_lookups dict[str, Any] | None

Pre-materialized lookup DataFrames keyed by lookup name. Runtime type is dict[str, DataFrame]; Any avoids coupling the signature to PySpark types.

None
weave_lookups dict[str, Lookup] | None

Weave-level lookup definitions for on-demand resolution of non-materialized lookups.

None

Returns:

Type Description
ThreadResult

class:ThreadResult with status, row count, write mode, target path,

ThreadResult

and optional telemetry.

Raises:

Type Description
ExecutionError

If any step fails, with thread_name set on the error.

DataValidationError

If a fatal-severity validation rule fails.

build_plan(weave_name, threads, thread_entries)

Build an execution plan from a weave's threads and declared/inferred dependencies.

Performs DAG construction, cycle detection, topological sort, and cache analysis to produce an immutable :class:ExecutionPlan.

Parameters:

Name Type Description Default
weave_name str

Name of the weave being planned.

required
threads dict[str, Thread]

Mapping of thread name to :class:Thread config.

required
thread_entries list[ThreadEntry]

Ordered thread entries from the weave config, which may carry explicit dependency declarations.

required

Returns:

Type Description
ExecutionPlan

An immutable :class:ExecutionPlan.

Raises:

Type Description
ConfigError

If a circular dependency is detected (with cycle path in the message), or if an explicit dependency references a thread that does not exist in threads.

execute_loom(spark, loom, weaves, threads, params=None)

Execute weaves sequentially in declared order.

Iterates the weaves listed in loom.weaves, builds an :class:~weevr.engine.planner.ExecutionPlan for each, and executes them via :func:execute_weave. Creates a root span collector for telemetry and passes it through the execution tree.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
loom Loom

Loom configuration declaring the ordered list of weave names.

required
weaves dict[str, Weave]

Mapping of weave name to :class:~weevr.model.weave.Weave config.

required
threads dict[str, dict[str, Thread]]

Nested mapping of weave_name → thread_name → Thread config.

required
params dict[str, Any] | None

Parameters for condition evaluation at weave and thread levels.

None

Returns:

Type Description
LoomResult

class:~weevr.engine.result.LoomResult with aggregate status and

LoomResult

per-weave results.

execute_weave(spark, plan, threads, collector=None, parent_span_id=None, thread_conditions=None, params=None, weave_span_label=None, pre_steps=None, post_steps=None, lookups=None, variables=None)

Execute threads according to the execution plan.

Processes each parallel group sequentially, submitting threads within a group to a :class:~concurrent.futures.ThreadPoolExecutor for concurrent execution. Respects on_failure config on each thread and manages the cache lifecycle via :class:~weevr.engine.cache_manager.CacheManager.

When hooks are provided, the lifecycle is: 1. Initialize VariableContext from variables. 2. Materialize lookups (materialize=True). 3. Execute pre_steps. 4. Execute threads (with cached lookup DataFrames). 5. Execute post_steps. 6. Cleanup lookups.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
plan ExecutionPlan

Immutable :class:~weevr.engine.planner.ExecutionPlan produced by :func:~weevr.engine.planner.build_plan.

required
threads dict[str, Thread]

Mapping of thread name to :class:~weevr.model.thread.Thread config.

required
collector SpanCollector | None

Optional span collector for telemetry. When provided, per-thread collectors are created and merged after execution.

None
parent_span_id str | None

Optional parent span ID for trace tree linkage.

None
thread_conditions dict[str, ConditionSpec] | None

Mapping of thread name to condition spec. Threads with a condition that evaluates to False are skipped.

None
params dict[str, Any] | None

Parameters for condition evaluation.

None
weave_span_label str | None

Label for the weave telemetry span. When set, used instead of plan.weave_name. Typically the weave's qualified key.

None
pre_steps Sequence[HookStep] | None

Optional pre-execution hook steps.

None
post_steps Sequence[HookStep] | None

Optional post-execution hook steps.

None
lookups dict[str, Lookup] | None

Optional weave-level lookup definitions.

None
variables dict[str, VariableSpec] | None

Optional weave-level variable specs.

None

Returns:

Type Description
WeaveResult

class:~weevr.engine.result.WeaveResult with aggregate status and

WeaveResult

per-thread results.