Engine Internals¶
This guide explains how weevr's execution engine turns configuration into running Spark pipelines. It covers the planning, execution, and result aggregation subsystems.
Introduction¶
The engine is responsible for a single question: given a set of configured threads, in what order should they run, and how should each thread's pipeline execute? The answer involves dependency analysis, parallel scheduling, caching, conditional execution, and structured result collection.
Key concepts¶
- ExecutionPlan — An immutable snapshot of thread ordering, dependency
edges, parallel execution groups, cache targets, lookup dependency
mappings (
lookup_producers,lookup_consumers), and alookup_schedulethat assigns each lookup to its materialization point. Produced by the planner before any data is read. In notebooks, anExecutionPlanrenders automatically via_repr_html_(). Callplan.dag()to get aDAGDiagramSVG that can be saved withdag.save("plan.svg"). For plan mode results,result.dag()returns either a single-weave DAG or a loom-level swimlane diagram depending on how many weaves are present. - DAG — A directed acyclic graph of thread dependencies. Dependencies are inferred from source/target path overlap and can also be declared explicitly.
- Parallel execution group — A set of threads with no mutual dependencies that can run concurrently on the same SparkSession.
- Cache target — A thread whose output is read by two or more downstream threads. The engine caches the DataFrame in memory to avoid re-reading from Delta.
How it works¶
Execution follows a three-stage pipeline: plan → execute → aggregate.
Planning¶
The planner builds a DAG from thread configurations:
- Infer dependencies — If thread B reads from thread A's target path, B depends on A. This happens automatically by matching source paths/aliases to target paths/aliases across all threads.
- Merge explicit dependencies — Weave entries can declare additional dependencies that are not data-based (e.g., ordering constraints).
- Detect cycles — DFS with three-color marking (white/gray/black) finds back edges. If a cycle exists, execution fails immediately with a clear error showing the cycle path.
- Topological sort — Kahn's algorithm produces a sequence of parallel groups. Threads within a group have no mutual dependencies and can run concurrently.
- Analyze cache targets — Threads with two or more downstream dependents
are flagged for caching, unless the thread explicitly sets
cache: false. Settingcache: trueis a no-op — it only preventscache: falsesuppression but does not force caching for threads with fewer than two dependents.
The result is an immutable ExecutionPlan that the executor consumes without
modification.
Execution¶
Thread execution is orchestrated at two levels:
Loom level (execute_loom) — Iterates weaves in declared order. Each weave
gets its own plan and executor pass. Weave-level conditions are evaluated before
planning; a false condition skips the entire weave. If a weave's status is
"failure", the loom stops executing further weaves — remaining weaves are
skipped.
Weave level (execute_weave) — Orchestrates the full weave lifecycle:
- Initialize variables — A
VariableContextis created from the weave'svariablesblock. Variables are available to hook steps via${var.name}placeholders. - Pre-steps — If the weave defines
pre_steps, hook steps run before any materialization. Failures withon_failure: abortstop the weave. Pre-steps can set variables that downstream materialization and threads observe. - Lookup materialization (external) — External lookups (those not produced by a thread in the weave) are materialized after pre-steps and before any thread runs. Internal lookups whose source is produced by a thread in group N are deferred and materialized at the group N+1 boundary — after their producer completes.
- Column set materialization — If the weave (or loom) defines
column_sets, all sets are resolved to from→to mapping dicts. Delta/YAML sources are read viaread_source(); param sources resolve from runtime parameters. Connection-backed column sets resolve through the merged loom + weave connection dict. Results are captured inWeaveTelemetry.column_set_results. - Thread execution — Parallel groups are processed
sequentially. Within each group, threads are submitted to a
ThreadPoolExecutorfor concurrent execution. Threads reference lookups viasource.lookupand receive the cached DataFrame. - Post-steps — After all threads complete,
post_stepshook steps run. Failures withon_failure: abortmark the weave as failed.
After each thread completes:
- If it is a cache target, the output DataFrame is persisted.
- Consumer reference counts are decremented; caches are unpersisted when no consumers remain.
- Thread-level telemetry collectors are merged into the weave collector.
Thread level (execute_thread) — A single thread runs through an 18-step
pipeline:
- Initialize thread-level resources (variables, lookups, column_sets)
- Run thread-level
pre_steps - Resolve lookup-based sources from cached or on-demand DataFrames
- Read all remaining declared sources (with watermark/CDC filtering if
applicable). For
mode: incremental_watermark, the primary source is filtered against the prior HWM and the new HWM is captured from the filtered DataFrame before transforms. For genericmode: cdc(explicitcdc.operation_column) composed with awatermark_column, the same pattern applies and the HWM aggregate runs before I/U/D operation routing — delete rows participate in advancing the window so append-only CDC history tables (e.g. SAP Open Database Mirror) do not reprocess deletes on every run. Empty filtered windows yieldnew_hwm = Noneso step 14 leaves the persisted HWM untouched. The Delta CDF preset path (cdc.preset: delta_cdf) is unchanged: it captures_commit_versioninstead of a column-based watermark and rejectswatermark_columnat config-parse time. Whenload.watermark_formatis set, both reader paths wrap the column into_timestamp/to_date(col, format)so string-typed watermark columns (SAP, mainframe, JSON) can be parsed declaratively. - Set the primary (first) source as the working DataFrame
- Run pipeline steps against the working DataFrame
- Evaluate validation rules; quarantine or abort on failures
- Apply column and table naming normalization, including
reserved word protection (if configured). Seven strategies
are available:
quote,prefix,suffix,error,rename,revert, anddrop. See configuration keys for details. - Compute business keys and change detection hashes
- Resolve the target write path
- Apply target column mapping
- Inject audit columns (bypasses mapping mode; applied for all write modes)
- Write to the Delta target (standard write or CDC merge routing)
- Persist watermark or CDC state
- Run post-write assertions
- Write exports (secondary outputs, if configured)
- Run thread-level
post_steps - Build telemetry and return
ThreadResult
Failure handling¶
✓ = success, ✗ = failed, ⊘ = skipped
Each thread has a configurable failure policy:
| Policy | Behavior |
|---|---|
abort_weave |
All remaining threads in the weave are skipped (default) |
skip_downstream |
Only transitive dependents of the failed thread are skipped |
continue |
Same as skip_downstream — other independent threads proceed |
Transitive dependents are computed via BFS over the reverse dependency graph.
Cache lifecycle¶
The CacheManager uses reference counting to manage in-memory DataFrames:
- When a cache-target thread completes, its output DataFrame is persisted
directly at
MEMORY_AND_DISKlevel (no re-read from Delta). - Each downstream consumer calls
notify_complete()after finishing, which decrements the consumer count. - When the count reaches zero, the cached DataFrame is automatically unpersisted.
- A
cleanup()call in the executor'sfinallyblock force-unpersists anything remaining, preventing memory leaks.
Cache failures are non-fatal — if persistence fails, execution continues without the cache optimization.
Module map¶
| Module | Responsibility |
|---|---|
engine/planner.py |
DAG construction, cycle detection, topological sort, cache analysis |
engine/executor.py |
Thread-level pipeline: read → transform → validate → write |
engine/runner.py |
Weave and loom orchestration, parallel dispatch, failure handling |
engine/result.py |
Immutable result models: ThreadResult, WeaveResult, LoomResult |
engine/cache_manager.py |
Reference-counted DataFrame caching |
engine/conditions.py |
Condition evaluation: parameter resolution, built-in functions, boolean parsing |
engine/hooks.py |
Pre/post hook step execution: quality gates, SQL statements, log messages |
engine/lookups.py |
Lookup materialization: pre-read, narrow projection, caching/broadcast |
engine/variables.py |
Weave-scoped variable binding and resolution |
engine/display.py |
SVG visualization (DAG, flow, timeline, waterfall) and HTML result rendering for notebooks |
Design decisions¶
- Immutable plans and results —
ExecutionPlan,ThreadResult,WeaveResult, andLoomResultare all frozen. This prevents accidental mutation during concurrent execution. - Per-thread telemetry collectors — Each thread gets its own
SpanCollectorduring concurrent execution, avoiding lock contention. Collectors merge into the weave-level collector after thread completion. - Fail-fast cycle detection — Cycles are detected before any data is read, giving clear errors without wasted compute.
- Safe condition evaluation — Conditions are parsed with a recursive
descent evaluator (no
eval()). Only whitelisted operators and built-in functions are supported.
Further reading¶
- Thread, Weave, Loom — The configuration hierarchy
- Add a Thread — Step-by-step thread creation
- Observability — Telemetry spans and logging