Operations API¶
The weevr.operations module implements the concrete readers, transformation
steps, and writers that the engine dispatches to. Each pipeline step type maps
to an operation function.
weevr.operations
¶
weevr operations — source readers, pipeline steps, hashing, mapping, writers, and validation.
__all__ = ['apply_target_mapping', 'compute_keys', 'evaluate_assertions', 'read_source', 'read_sources', 'run_pipeline', 'validate_dataframe', 'ValidationOutcome', 'write_quarantine', 'write_target']
module-attribute
¶
ValidationOutcome
¶
Result of validating a DataFrame against a set of rules.
Attributes:
| Name | Type | Description |
|---|---|---|
clean_df |
Rows that passed all error-severity rules. |
|
quarantine_df |
Rows that failed at least one error-severity rule, exploded by failed rule with metadata columns. None when no error-severity rules exist or when a fatal rule triggered. |
|
validation_results |
Per-rule pass/fail counts. |
|
has_fatal |
True if any fatal-severity rule had failures. |
__init__(clean_df, quarantine_df, validation_results, has_fatal)
¶
Initialize with clean/quarantine splits and validation results.
evaluate_assertions(spark, assertions, target_path)
¶
Evaluate post-write assertions against a Delta target table.
Supports four assertion types: row_count, column_not_null, unique, and expression. Each assertion produces an AssertionResult with pass/fail status, details, and the configured severity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
assertions
|
list[Assertion]
|
List of assertion definitions to evaluate. |
required |
target_path
|
str
|
Path to the Delta table to assert against. |
required |
Returns:
| Type | Description |
|---|---|
list[AssertionResult]
|
List of AssertionResult, one per input assertion. |
compute_keys(df, keys)
¶
Apply key and hash computations declared in a :class:KeyConfig.
Steps are applied in this order: 1. Validate business key columns exist. 2. Compute surrogate key (if configured). 3. Compute change detection hash (if configured).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input DataFrame. |
required |
keys
|
KeyConfig
|
Key configuration — business key columns, surrogate key spec, change detection spec. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
DataFrame with surrogate key and/or change detection hash columns appended. |
Raises:
| Type | Description |
|---|---|
ExecutionError
|
If any declared business key column is missing from |
run_pipeline(df, steps, sources)
¶
Execute a sequence of pipeline steps against a working DataFrame.
Steps are applied in order. Join and union steps may reference other
loaded source DataFrames via sources. Any Spark exception raised
during a step is wrapped in :class:~weevr.errors.exceptions.ExecutionError
with step index and type context attached.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Initial working DataFrame (typically the primary source). |
required |
steps
|
list[Step]
|
Ordered list of pipeline step configurations. |
required |
sources
|
dict[str, DataFrame]
|
All loaded source DataFrames, keyed by alias. Required for join and union steps that reference secondary sources. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
Final DataFrame after all steps have been applied. |
Raises:
| Type | Description |
|---|---|
ExecutionError
|
If any step fails, with step_index and step_type set. |
write_quarantine(spark, quarantine_df, target_path)
¶
Write quarantine rows to a Delta table at {target_path}_quarantine.
Writes using overwrite mode for idempotent re-execution. Skips the write entirely if quarantine_df is None or empty.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
quarantine_df
|
DataFrame | None
|
DataFrame with quarantine metadata columns (__rule_name, __rule_expression, __severity, __quarantine_ts). May be None if no rows need quarantining. |
required |
target_path
|
str
|
Path of the original target Delta table. The quarantine
table is written to |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of rows written, or 0 if no write occurred. |
read_source(spark, alias, source)
¶
Read a single source into a DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
alias
|
str
|
Logical name for this source (used in error messages). |
required |
source
|
Source
|
Source configuration. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
DataFrame with source data, with dedup applied if configured. |
Raises:
| Type | Description |
|---|---|
ExecutionError
|
If the source cannot be read or the type is unsupported. |
read_sources(spark, sources)
¶
Read all sources into a mapping of alias -> DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
sources
|
dict[str, Source]
|
Mapping of alias -> Source config. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, DataFrame]
|
Mapping of alias -> DataFrame, in the same key order as |
validate_dataframe(df, rules)
¶
Evaluate all validation rules in a single pass with severity routing.
Rules are evaluated as boolean Spark SQL expressions. Each row is tagged with pass/fail per rule, then routed based on severity: - info/warn: logged only, rows stay in clean_df - error: rows moved to quarantine_df (one row per failed rule) - fatal: has_fatal=True, no split performed (caller should abort)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input DataFrame to validate. |
required |
rules
|
list[ValidationRule]
|
Validation rules to evaluate. |
required |
Returns:
| Type | Description |
|---|---|
ValidationOutcome
|
ValidationOutcome with clean/quarantine split and per-rule results. |
apply_target_mapping(df, target, spark)
¶
Apply target column mapping to shape the DataFrame for writing.
Column-level transformations (expr, type, default, drop) are applied first. Then the column set is narrowed according to the mapping mode:
- auto: if the target table already exists, keep only columns present in its schema. If it does not yet exist, all columns pass through.
- explicit: keep only columns explicitly declared in
target.columns(drop columns excluded).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input DataFrame. |
required |
target
|
Target
|
Target configuration including mapping mode and column specs. |
required |
spark
|
SparkSession
|
Active SparkSession (used to probe existing target schema). |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
DataFrame shaped for writing to the target. |
write_target(spark, df, target, write_config, target_path)
¶
Write a DataFrame to a Delta table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
df
|
DataFrame
|
DataFrame to write. |
required |
target
|
Target
|
Target configuration (partition_by). |
required |
write_config
|
WriteConfig | None
|
Write mode and merge parameters. Defaults to overwrite when None. |
required |
target_path
|
str
|
Physical path for the Delta table. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of rows in |
Raises:
| Type | Description |
|---|---|
ExecutionError
|
If the write operation fails. |