Skip to content

Engine API

The weevr.engine module contains the execution planner and executor. It resolves the thread dependency DAG, schedules execution order, and runs each thread through its read-transform-write lifecycle. The engine also handles pre/post weave hooks, lookup pre-materialization, conditional thread execution, and weave-scoped variable binding.

weevr.engine

weevr engine — thread, weave, and loom execution orchestration.

__all__ = ['execute_thread', 'ThreadResult', 'ExecutionPlan', 'build_plan', 'DAGDiagram', 'FlowDiagram', 'execute_weave', 'WeaveResult', 'execute_loom', 'LoomResult', 'CacheManager'] module-attribute

CacheManager

Manages the lifecycle of cached DataFrames within a weave execution.

After a thread that is a cache target completes, the manager persists its output DataFrame so that multiple downstream consumers can read from memory/disk rather than re-scanning the Delta table. When all consumers of a cached thread have finished, the DataFrame is automatically unpersisted.

Cache failures are non-fatal — they degrade performance but never affect correctness (consumers fall back to reading from Delta directly).

Parameters:

Name Type Description Default
cache_targets list[str]

Thread names whose outputs should be cached.

required
dependents dict[str, list[str]]

Maps each thread name to the list of threads that depend on it.

required

__init__(cache_targets, dependents)

Initialize with target thread names and their dependency map.

is_cache_target(thread_name)

Return True if the thread's output should be cached.

persist(thread_name, spark, target_path)

Read and persist the thread's target DataFrame.

The DataFrame is read from target_path using the provided SparkSession and persisted at MEMORY_AND_DISK level. If the operation fails for any reason, a warning is logged and execution continues without caching.

Parameters:

Name Type Description Default
thread_name str

Name of the thread whose output to cache.

required
spark SparkSession

Active SparkSession.

required
target_path str

Delta path to read the DataFrame from.

required

notify_complete(consumer_name)

Notify the manager that a consumer thread has completed.

When all consumers of a cached thread have finished, its cached DataFrame is automatically unpersisted. If unpersist fails, a warning is logged and the memory is eventually reclaimed by Spark's GC.

Parameters:

Name Type Description Default
consumer_name str

Name of the thread that just completed.

required

cleanup()

Force-unpersist all remaining cached DataFrames.

Called in a finally block to ensure caches are always released, even when execution fails mid-way.

DAGDiagram

Inline SVG diagram of a weave execution plan.

Auto-renders in notebooks via the _repr_svg_() display protocol. SVG markup available as string via __str__() or the svg property. Export to file via save().

Parameters:

Name Type Description Default
svg str

The raw SVG markup string.

required

svg property

The raw SVG markup.

__init__(svg)

Initialize with SVG markup.

__str__()

Return SVG markup as string.

save(path)

Write SVG to a file.

Parameters:

Name Type Description Default
path str

File path to write. Parent directory must exist.

required

Raises:

Type Description
FileNotFoundError

If the parent directory does not exist.

FlowDiagram

Inline SVG diagram of a thread's processing pipeline.

Visualizes the flow from sources through transforms to the target. Auto-renders in notebooks via the _repr_svg_() display protocol.

Parameters:

Name Type Description Default
svg str

The raw SVG markup string.

required

svg property

The raw SVG markup.

__init__(svg)

Initialize with SVG markup.

__str__()

Return SVG markup as string.

save(path)

Write SVG to a file.

Parameters:

Name Type Description Default
path str

File path to write. Parent directory must exist.

required

Raises:

Type Description
FileNotFoundError

If the parent directory does not exist.

ExecutionPlan

Bases: FrozenBase

Immutable execution plan produced by the planner.

Attributes:

Name Type Description
weave_name str

Name of the weave this plan was built for.

threads list[str]

All thread names in the weave.

dependencies dict[str, list[str]]

Maps each thread to the list of threads it depends on.

dependents dict[str, list[str]]

Maps each thread to the list of threads that depend on it.

execution_order list[list[str]]

Parallel execution groups in topological order. Each inner list contains threads that can run concurrently.

cache_targets list[str]

Thread names whose output should be cached because they feed two or more downstream threads.

inferred_dependencies dict[str, list[str]]

The auto-inferred subset of dependencies, determined by matching target paths to source paths across threads.

explicit_dependencies dict[str, list[str]]

The explicitly declared subset of dependencies, sourced from ThreadEntry.dependencies in the weave config.

lookup_schedule dict[int, list[str]] | None

Maps execution group indices to lookups that should be materialized at that boundary. Key 0 means before the first group; key N (N > 0) means after group N-1 finishes. None when build_plan was called without lookups.

lookup_producers dict[str, str | None] | None

Maps lookup names to the thread that produces their source data, or None for external lookups. Only populated when build_plan was called with lookups.

lookup_consumers dict[str, list[str]] | None

Maps lookup names to the list of threads that consume each lookup. Only populated when build_plan was called with lookups.

dag(*, dark=None)

Return an inline SVG DAG diagram of this execution plan.

The returned :class:DAGDiagram auto-renders in notebooks via _repr_svg_() and can be exported via save().

Parameters:

Name Type Description Default
dark bool | None

Force dark (True) or light (False) palette. None uses prefers-color-scheme media query (default).

None

LoomResult

Bases: FrozenBase

Immutable record of a completed loom execution.

Attributes:

Name Type Description
status Literal['success', 'failure', 'partial']

Aggregate outcome — "success" if all weaves succeeded, "failure" if loom stopped after a weave failure, "partial" if some weaves succeeded before a failure.

loom_name str

Name of the loom that was executed.

weave_results list[WeaveResult]

Results for each weave that was executed.

duration_ms int

Wall-clock duration of the loom execution in milliseconds.

telemetry LoomTelemetry | None

Loom-level telemetry aggregated from weave telemetry.

ThreadResult

Bases: FrozenBase

Immutable record of a completed thread execution.

Attributes:

Name Type Description
status Literal['success', 'failure', 'skipped']

Outcome of the execution — "success", "failure", or "skipped" (when a condition evaluated to False).

thread_name str

Name of the thread that was executed.

rows_written int

Number of rows in the DataFrame at write time.

write_mode str

The write mode used ("overwrite", "append", or "merge").

target_path str

Physical path of the Delta table that was written.

telemetry ThreadTelemetry | None

Thread-level telemetry with validation/assertion results and row counts.

skip_reason str | None

The condition expression that caused the thread to be skipped.

error str | None

Error message when the thread failed, None on success or skip.

output_schema list[tuple[str, str]] | None

Column names and Spark data types captured from the output DataFrame before write. Each tuple is (column_name, type_string).

samples dict[str, list[dict[str, Any]]] | None

Data samples captured before write, keyed by category. Contains "output" (up to 10 rows) and optionally "quarantine" (up to 10 rows from the quarantine DataFrame).

drift_report dict[str, Any] | None

Schema drift report captured before write when schema drift detection is active. None when no drift handling is configured.

warp_findings list[dict[str, str]] | None

Warp enforcement findings from this run — each entry names a column and the mismatch reason. None when warp enforcement is not configured for the target.

WeaveResult

Bases: FrozenBase

Immutable record of a completed weave execution.

Attributes:

Name Type Description
status Literal['success', 'failure', 'partial', 'skipped']

Aggregate outcome — "success" if all threads succeeded or were conditionally skipped (no failures), "failure" if all threads failed or were skipped due to upstream failure, "partial" if some threads succeeded and some failed, "skipped" if the weave was conditionally skipped at loom level.

weave_name str

Name of the weave that was executed.

thread_results list[ThreadResult]

Results for each thread that was executed (not skipped).

threads_skipped list[str]

Names of threads that were skipped due to upstream failure.

duration_ms int

Wall-clock duration of the weave execution in milliseconds.

telemetry WeaveTelemetry | None

Weave-level telemetry aggregated from thread telemetry.

skip_reason str | None

The condition expression that caused the weave to be skipped.

execute_thread(spark, thread, collector=None, parent_span_id=None, cached_lookups=None, weave_lookups=None, resolved_params=None, loom_name='', weave_name='', column_sets=None, column_set_defs=None, connections=None)

Execute a single thread from sources through transforms to a Delta target.

Execution order: 1. Initialize thread-level resources (variables, lookups, column_sets). 2. Run thread-level pre_steps. 3. Resolve lookup-based sources from cached or on-demand DataFrames. 4. Read all remaining declared sources into DataFrames. - For incremental_watermark: load prior HWM, apply filter, capture new HWM. - For cdc: read via CDF or generic CDC source. 5. Set the primary (first) source as the working DataFrame. 6. Run pipeline steps against the working DataFrame. 7. Run validation rules (if configured) — quarantine or abort on failures. 8. Apply naming normalization (if configured). 9. Compute business keys and hashes if configured. 10. Resolve the target write path. 11. Apply target column mapping. 12. Inject audit columns (if configured; bypasses mapping mode). 13. Write to the Delta target. - For cdc: use CDC merge routing instead of standard write. 14. Persist watermark state (if applicable). 15. Run post-write assertions (if configured). 16. Write exports (secondary outputs, if configured). 17. Run thread-level post_steps. 18. Build telemetry and return ThreadResult.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
thread Thread

Thread configuration to execute.

required
collector SpanCollector | None

Optional span collector for telemetry. When provided, a thread span is created and finalized.

None
parent_span_id str | None

Optional parent span ID for trace tree linkage.

None
cached_lookups dict[str, Any] | None

Pre-materialized lookup DataFrames keyed by lookup name. Runtime type is dict[str, DataFrame]; Any avoids coupling the signature to PySpark types.

None
weave_lookups dict[str, Lookup] | None

Weave-level lookup definitions for on-demand resolution of non-materialized lookups.

None
resolved_params dict[str, Any] | None

Runtime parameter values for telemetry capture. Stored on ThreadTelemetry for thread-level runs.

None
loom_name str

Loom name for audit column context variables.

''
weave_name str

Weave name for audit column context variables. Derived from the prefix of thread.qualified_key when not provided. Falls back to empty string if no dot separator exists.

''
column_sets dict[str, dict[str, str]] | None

Pre-resolved column set mappings keyed by name. Passed through to run_pipeline so rename steps can look up their resolved dictionaries.

None
column_set_defs dict[str, ColumnSet] | None

Column set model instances keyed by name. Passed through to run_pipeline so rename steps can read on_unmapped and on_extra settings.

None
connections dict[str, OneLakeConnection] | None

Named connection declarations keyed by connection name. When provided, source reads, lookup materialization, target path resolution, and export writes use connection-based paths where configured.

None

Returns:

Type Description
ThreadResult

class:ThreadResult with status, row count, write mode, target path,

ThreadResult

and optional telemetry.

Raises:

Type Description
ExecutionError

If any step fails, with thread_name set on the error.

DataValidationError

If a fatal-severity validation rule fails.

ExportError

If an export with on_failure: abort fails.

build_plan(weave_name, threads, thread_entries, lookups=None)

Build an execution plan from a weave's threads and declared/inferred dependencies.

Performs DAG construction, cycle detection, topological sort, and cache analysis to produce an immutable :class:ExecutionPlan.

When lookups are provided, the planner also:

  • Infers implicit dependencies between threads mediated by lookups (a thread consuming a lookup whose source is produced by another thread depends on that producer).
  • Computes a lookup schedule mapping each execution group boundary to the lookups that should be materialized before that group runs.

Parameters:

Name Type Description Default
weave_name str

Name of the weave being planned.

required
threads dict[str, Thread]

Mapping of thread name to :class:Thread config.

required
thread_entries list[ThreadEntry]

Ordered thread entries from the weave config, which may carry explicit dependency declarations.

required
lookups dict[str, Lookup] | None

Optional weave-level lookup definitions. When provided, lookup- mediated dependencies are inferred and a materialization schedule is computed.

None

Returns:

Type Description
ExecutionPlan

An immutable :class:ExecutionPlan.

Raises:

Type Description
ConfigError

If a circular dependency is detected (with cycle path in the message), or if an explicit dependency references a thread that does not exist in threads.

execute_loom(spark, loom, weaves, threads, params=None)

Execute weaves sequentially in declared order.

Iterates the weaves listed in loom.weaves, builds an :class:~weevr.engine.planner.ExecutionPlan for each, and executes them via :func:execute_weave. Creates a root span collector for telemetry and passes it through the execution tree.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
loom Loom

Loom configuration declaring the ordered list of weave names.

required
weaves dict[str, Weave]

Mapping of weave name to :class:~weevr.model.weave.Weave config.

required
threads dict[str, dict[str, Thread]]

Nested mapping of weave_name → thread_name → Thread config.

required
params dict[str, Any] | None

Parameters for condition evaluation at weave and thread levels.

None

Returns:

Type Description
LoomResult

class:~weevr.engine.result.LoomResult with aggregate status and

LoomResult

per-weave results.

execute_weave(spark, plan, threads, collector=None, parent_span_id=None, thread_conditions=None, params=None, weave_span_label=None, pre_steps=None, post_steps=None, lookups=None, variables=None, loom_name='', weave_name='', column_set_defs=None, pre_cached_lookups=None, connections=None)

Execute threads according to the execution plan.

Processes each parallel group sequentially, submitting threads within a group to a :class:~concurrent.futures.ThreadPoolExecutor for concurrent execution. Respects on_failure config on each thread and manages the cache lifecycle via :class:~weevr.engine.cache_manager.CacheManager.

When hooks are provided, the lifecycle is: 1. Initialize VariableContext from variables. 2. Execute pre_steps. 3. Materialize lookups (materialize=True). 4. Materialize column sets. 5. Execute threads (with cached lookup DataFrames). 6. Execute post_steps. 7. Cleanup lookups.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
plan ExecutionPlan

Immutable :class:~weevr.engine.planner.ExecutionPlan produced by :func:~weevr.engine.planner.build_plan.

required
threads dict[str, Thread]

Mapping of thread name to :class:~weevr.model.thread.Thread config.

required
collector SpanCollector | None

Optional span collector for telemetry. When provided, per-thread collectors are created and merged after execution.

None
parent_span_id str | None

Optional parent span ID for trace tree linkage.

None
thread_conditions dict[str, ConditionSpec] | None

Mapping of thread name to condition spec. Threads with a condition that evaluates to False are skipped.

None
params dict[str, Any] | None

Parameters for condition evaluation.

None
weave_span_label str | None

Label for the weave telemetry span. When set, used instead of plan.weave_name. Typically the weave's qualified key.

None
pre_steps Sequence[HookStep] | None

Optional pre-execution hook steps.

None
post_steps Sequence[HookStep] | None

Optional post-execution hook steps.

None
lookups dict[str, Lookup] | None

Optional weave-level lookup definitions.

None
variables dict[str, VariableSpec] | None

Optional weave-level variable specs.

None
loom_name str

Loom name passed to threads for audit column context.

''
weave_name str

Weave name passed to threads for audit column context.

''
column_set_defs dict[str, ColumnSet] | None

Merged column set definitions (loom-level merged with weave-level, weave wins) to be materialized and forwarded to each thread for rename step resolution.

None
pre_cached_lookups dict[str, Any] | None

Already-materialized lookup DataFrames from a parent scope (e.g. loom level). Seeded into the weave's cached lookup dict so threads can reference them without re-materializing.

None
connections dict[str, OneLakeConnection] | None

Merged loom + weave connections forwarded to materialize_column_sets so connection-backed column sets can resolve their lakehouse target.

None

Returns:

Type Description
WeaveResult

class:~weevr.engine.result.WeaveResult with aggregate status and

WeaveResult

per-thread results.