Skip to content

Operations API

The weevr.operations module implements the concrete readers, transformation steps, and writers that the engine dispatches to. Each pipeline step type maps to an operation function.

weevr.operations

weevr operations — source readers, pipeline steps, hashing, mapping, writers, and validation.

__all__ = ['apply_target_mapping', 'compute_keys', 'evaluate_assertions', 'read_source', 'read_sources', 'run_pipeline', 'validate_dataframe', 'ValidationOutcome', 'write_quarantine', 'write_target'] module-attribute

ValidationOutcome

Result of validating a DataFrame against a set of rules.

Attributes:

Name Type Description
clean_df

Rows that passed all error-severity rules.

quarantine_df

Rows that failed at least one error-severity rule, exploded by failed rule with metadata columns. None when no error-severity rules exist or when a fatal rule triggered.

validation_results

Per-rule pass/fail counts.

has_fatal

True if any fatal-severity rule had failures.

__init__(clean_df, quarantine_df, validation_results, has_fatal)

Initialize with clean/quarantine splits and validation results.

evaluate_assertions(spark, assertions, target_path)

Evaluate post-write assertions against a Delta target table.

Supports four assertion types: row_count, column_not_null, unique, and expression. Each assertion produces an AssertionResult with pass/fail status, details, and the configured severity.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
assertions list[Assertion]

List of assertion definitions to evaluate.

required
target_path str

Path to the Delta table to assert against.

required

Returns:

Type Description
list[AssertionResult]

List of AssertionResult, one per input assertion.

compute_keys(df, keys)

Apply key and hash computations declared in a :class:KeyConfig.

Steps are applied in this order: 1. Validate business key columns exist. 2. Compute surrogate key (if configured). 3. Compute change detection hash (if configured).

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
keys KeyConfig

Key configuration — business key columns, surrogate key spec, change detection spec.

required

Returns:

Type Description
DataFrame

DataFrame with surrogate key and/or change detection hash columns appended.

Raises:

Type Description
ExecutionError

If any declared business key column is missing from df.

run_pipeline(df, steps, sources)

Execute a sequence of pipeline steps against a working DataFrame.

Steps are applied in order. Join and union steps may reference other loaded source DataFrames via sources. Any Spark exception raised during a step is wrapped in :class:~weevr.errors.exceptions.ExecutionError with step index and type context attached.

Parameters:

Name Type Description Default
df DataFrame

Initial working DataFrame (typically the primary source).

required
steps list[Step]

Ordered list of pipeline step configurations.

required
sources dict[str, DataFrame]

All loaded source DataFrames, keyed by alias. Required for join and union steps that reference secondary sources.

required

Returns:

Type Description
DataFrame

Final DataFrame after all steps have been applied.

Raises:

Type Description
ExecutionError

If any step fails, with step_index and step_type set.

write_quarantine(spark, quarantine_df, target_path)

Write quarantine rows to a Delta table at {target_path}_quarantine.

Writes using overwrite mode for idempotent re-execution. Skips the write entirely if quarantine_df is None or empty.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
quarantine_df DataFrame | None

DataFrame with quarantine metadata columns (__rule_name, __rule_expression, __severity, __quarantine_ts). May be None if no rows need quarantining.

required
target_path str

Path of the original target Delta table. The quarantine table is written to {target_path}_quarantine.

required

Returns:

Type Description
int

Number of rows written, or 0 if no write occurred.

read_source(spark, alias, source)

Read a single source into a DataFrame.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
alias str

Logical name for this source (used in error messages).

required
source Source

Source configuration.

required

Returns:

Type Description
DataFrame

DataFrame with source data, with dedup applied if configured.

Raises:

Type Description
ExecutionError

If the source cannot be read or the type is unsupported.

read_sources(spark, sources)

Read all sources into a mapping of alias -> DataFrame.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
sources dict[str, Source]

Mapping of alias -> Source config.

required

Returns:

Type Description
dict[str, DataFrame]

Mapping of alias -> DataFrame, in the same key order as sources.

validate_dataframe(df, rules)

Evaluate all validation rules in a single pass with severity routing.

Rules are evaluated as boolean Spark SQL expressions. Each row is tagged with pass/fail per rule, then routed based on severity: - info/warn: logged only, rows stay in clean_df - error: rows moved to quarantine_df (one row per failed rule) - fatal: has_fatal=True, no split performed (caller should abort)

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame to validate.

required
rules list[ValidationRule]

Validation rules to evaluate.

required

Returns:

Type Description
ValidationOutcome

ValidationOutcome with clean/quarantine split and per-rule results.

apply_target_mapping(df, target, spark)

Apply target column mapping to shape the DataFrame for writing.

Column-level transformations (expr, type, default, drop) are applied first. Then the column set is narrowed according to the mapping mode:

  • auto: if the target table already exists, keep only columns present in its schema. If it does not yet exist, all columns pass through.
  • explicit: keep only columns explicitly declared in target.columns (drop columns excluded).

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
target Target

Target configuration including mapping mode and column specs.

required
spark SparkSession

Active SparkSession (used to probe existing target schema).

required

Returns:

Type Description
DataFrame

DataFrame shaped for writing to the target.

write_target(spark, df, target, write_config, target_path)

Write a DataFrame to a Delta table.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
df DataFrame

DataFrame to write.

required
target Target

Target configuration (partition_by).

required
write_config WriteConfig | None

Write mode and merge parameters. Defaults to overwrite when None.

required
target_path str

Physical path for the Delta table.

required

Returns:

Type Description
int

Number of rows in df (rows written for overwrite/append; input rows for merge).

Raises:

Type Description
ExecutionError

If the write operation fails.