Skip to content

Operations API

The weevr.operations module implements the concrete readers, transformation steps, writers, audit column injection, export writers, and quarantine handling that the engine dispatches to. Each pipeline step type maps to an operation function.

weevr.operations

weevr operations — readers, transforms, hashing, mapping, writers, exports, validation.

__all__ = ['AuditContext', 'apply_target_mapping', 'build_sources_json', 'compute_keys', 'evaluate_assertions', 'inject_audit_columns', 'read_source', 'read_sources', 'resolve_audit_columns', 'resolve_export_path', 'resolve_exports', 'run_pipeline', 'validate_dataframe', 'ValidationOutcome', 'write_export', 'write_quarantine', 'write_target'] module-attribute

_LAZY_ATTRS = {'AuditContext': 'weevr.operations.audit', 'build_sources_json': 'weevr.operations.audit', 'inject_audit_columns': 'weevr.operations.audit', 'resolve_audit_columns': 'weevr.operations.audit', 'evaluate_assertions': 'weevr.operations.assertions', 'resolve_export_path': 'weevr.operations.exports', 'resolve_exports': 'weevr.operations.exports', 'write_export': 'weevr.operations.exports', 'compute_keys': 'weevr.operations.hashing', 'run_pipeline': 'weevr.operations.pipeline', 'write_quarantine': 'weevr.operations.quarantine', 'read_source': 'weevr.operations.readers', 'read_sources': 'weevr.operations.readers', 'ValidationOutcome': 'weevr.operations.validation', 'validate_dataframe': 'weevr.operations.validation', 'apply_target_mapping': 'weevr.operations.writers', 'write_target': 'weevr.operations.writers'} module-attribute

AuditContext dataclass

Execution context for resolving audit column variables.

Attributes:

Name Type Description
thread_name str

Thread name (e.g. stg_orders).

thread_qualified_key str

Fully qualified key (e.g. staging.stg_orders).

thread_source str | None

Primary source alias (first declared source).

thread_sources_json str

JSON array of all sources with type metadata.

weave_name str

Weave name (e.g. staging).

loom_name str

Loom name (e.g. my-project).

run_timestamp str

ISO 8601 UTC timestamp of execution start.

run_id str

UUID4 identifier unique per execution.

ValidationOutcome

Result of validating a DataFrame against a set of rules.

Attributes:

Name Type Description
clean_df

Rows that passed all error-severity rules.

quarantine_df

Rows that failed at least one error-severity rule, exploded by failed rule with metadata columns. None when no error-severity rules exist or when a fatal rule triggered.

validation_results

Per-rule pass/fail counts.

has_fatal

True if any fatal-severity rule had failures.

__init__(clean_df, quarantine_df, validation_results, has_fatal)

Initialize with clean/quarantine splits and validation results.

evaluate_assertions(spark, assertions, target_path)

Evaluate post-write assertions against a Delta target table.

Supports four assertion types: row_count, column_not_null, unique, and expression. Each assertion produces an AssertionResult with pass/fail status, details, and the configured severity.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
assertions list[Assertion]

List of assertion definitions to evaluate.

required
target_path str

Path to the Delta table to assert against.

required

Returns:

Type Description
list[AssertionResult]

List of AssertionResult, one per input assertion.

build_sources_json(sources)

Build the ${thread.sources} JSON array from thread source definitions.

Each source entry includes name, alias (if available), and type classification (primary for the first source, lookup for lookup references, secondary for all others).

Parameters:

Name Type Description Default
sources Mapping[str, object]

Thread source definitions keyed by alias.

required

Returns:

Type Description
str

JSON string array of source metadata.

inject_audit_columns(df, audit_columns, context)

Resolve context variables, validate, and inject audit columns.

Each audit column expression is first processed for context variable substitution, then evaluated as a Spark SQL expression via F.expr().

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
audit_columns dict[str, str]

Audit column definitions as {name: expression}.

required
context AuditContext

Execution context for variable resolution.

required

Returns:

Type Description
DataFrame

DataFrame with audit columns appended.

Raises:

Type Description
ExecutionError

If an audit column name conflicts with an existing DataFrame column.

resolve_audit_columns(loom_audit, weave_audit, thread_audit)

Merge audit columns additively: loom -> weave -> thread.

Lower levels extend the column set. Same-named columns at a lower level override the expression from a higher level.

Parameters:

Name Type Description Default
loom_audit dict[str, str] | None

Audit columns from loom defaults.

required
weave_audit dict[str, str] | None

Audit columns from weave defaults.

required
thread_audit dict[str, str] | None

Audit columns from thread target.

required

Returns:

Type Description
dict[str, str]

Merged audit column dict. Empty dict if none defined.

resolve_export_path(path, context)

Substitute context variables in an export path string.

Delegates to the shared resolve_context_variables() in the audit module, which handles ${thread.*}, ${weave.*}, ${loom.*}, and ${run.*} namespaces.

Parameters:

Name Type Description Default
path str

Export path with variable placeholders.

required
context AuditContext

Execution context providing variable values.

required

Returns:

Type Description
str

Path with all context variables resolved. Unknown variables

str

are left as-is.

resolve_exports(exports, context)

Resolve context variables in export path fields.

Creates new Export instances with resolved paths. Exports using alias (no path) are returned unchanged.

Parameters:

Name Type Description Default
exports list[Export]

Export definitions with potential variable placeholders.

required
context AuditContext

Execution context for variable resolution.

required

Returns:

Type Description
list[Export]

List of exports with resolved paths.

write_export(spark, df, export, *, row_count=None)

Write a DataFrame to an export target.

Dispatches to the appropriate Spark format writer based on the export type. Captures timing, row count, and any errors.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
df DataFrame

DataFrame to write (post-mapping, audit-injected).

required
export Export

Export configuration with resolved path.

required
row_count int | None

Pre-computed row count to avoid an extra df.count() action. When None, the count is computed before writing.

None

Returns:

Type Description
ExportResult

ExportResult with write metrics and status.

compute_keys(df, keys)

Apply key and hash computations declared in a :class:KeyConfig.

Steps are applied in this order: 1. Validate business key columns exist. 2. Compute surrogate key (if configured). 3. Compute change detection hash (if configured).

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
keys KeyConfig

Key configuration — business key columns, surrogate key spec, change detection spec.

required

Returns:

Type Description
DataFrame

DataFrame with surrogate key and/or change detection hash columns appended.

Raises:

Type Description
ExecutionError

If any declared business key column is missing from df.

run_pipeline(df, steps, sources, column_sets=None, column_set_defs=None, lookups=None)

Execute a sequence of pipeline steps against a working DataFrame.

Steps are applied in order. Join and union steps may reference other loaded source DataFrames via sources. Any Spark exception raised during a step is wrapped in :class:~weevr.errors.exceptions.ExecutionError with step index and type context attached.

Parameters:

Name Type Description Default
df DataFrame

Initial working DataFrame (typically the primary source).

required
steps list[Step]

Ordered list of pipeline step configurations.

required
sources dict[str, DataFrame]

All loaded source DataFrames, keyed by alias. Required for join and union steps that reference secondary sources.

required
column_sets dict[str, dict[str, str]] | None

Pre-resolved column set mappings keyed by name. When a rename step references a column set, the resolved dict is looked up here and passed to apply_rename.

None
column_set_defs dict[str, ColumnSet] | None

Column set model instances keyed by name. Used to read on_unmapped and on_extra behaviour settings for rename steps that reference a column set.

None
lookups dict[str, DataFrame] | None

Cached lookup DataFrames keyed by name. Required for resolve steps that reference named lookups.

None

Returns:

Type Description
DataFrame

Final DataFrame after all steps have been applied.

Raises:

Type Description
ExecutionError

If any step fails, with step_index and step_type set.

write_quarantine(spark, quarantine_df, target_path)

Write quarantine rows to a Delta table at {target_path}_quarantine.

Writes using overwrite mode for idempotent re-execution. Skips the write entirely if quarantine_df is None or empty.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
quarantine_df DataFrame | None

DataFrame with quarantine metadata columns (__rule_name, __rule_expression, __severity, __quarantine_ts). May be None if no rows need quarantining.

required
target_path str

Path of the original target Delta table. The quarantine table is written to {target_path}_quarantine.

required

Returns:

Type Description
int

Number of rows written, or 0 if no write occurred.

read_source(spark, alias, source, connections=None)

Read a single source into a DataFrame.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
alias str

Logical name for this source (used in error messages).

required
source Source

Source configuration.

required
connections dict[str, OneLakeConnection] | None

Named connection declarations keyed by connection name. Required when source.connection is set.

None

Returns:

Type Description
DataFrame

DataFrame with source data, with dedup applied if configured.

Raises:

Type Description
ExecutionError

If the source cannot be read or the type is unsupported.

read_sources(spark, sources, connections=None)

Read all sources into a mapping of alias -> DataFrame.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
sources dict[str, Source]

Mapping of alias -> Source config.

required
connections dict[str, OneLakeConnection] | None

Named connection declarations forwarded to each read.

None

Returns:

Type Description
dict[str, DataFrame]

Mapping of alias -> DataFrame, in the same key order as sources.

validate_dataframe(df, rules)

Evaluate all validation rules in a single pass with severity routing.

Rules are evaluated as boolean Spark SQL expressions. Each row is tagged with pass/fail per rule, then routed based on severity: - info/warn: logged only, rows stay in clean_df - error: rows moved to quarantine_df (one row per failed rule) - fatal: has_fatal=True, no split performed (caller should abort)

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame to validate.

required
rules list[ValidationRule]

Validation rules to evaluate.

required

Returns:

Type Description
ValidationOutcome

ValidationOutcome with clean/quarantine split and per-rule results.

apply_target_mapping(df, target, spark)

Apply target column mapping to shape the DataFrame for writing.

Column-level transformations (expr, type, default, drop) are applied first. Then the column set is narrowed according to the mapping mode:

  • auto: if the target table already exists, keep only columns present in its schema. If it does not yet exist, all columns pass through.
  • explicit: keep only columns explicitly declared in target.columns (drop columns excluded).

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
target Target

Target configuration including mapping mode and column specs.

required
spark SparkSession

Active SparkSession (used to probe existing target schema).

required

Returns:

Type Description
DataFrame

DataFrame shaped for writing to the target.

write_target(spark, df, target, write_config, target_path)

Write a DataFrame to a Delta table.

If target_path is a table alias (e.g. staging.stg_customers), the write uses saveAsTable so the metastore manages the table location. Otherwise, save writes directly to the file path.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
df DataFrame

DataFrame to write.

required
target Target

Target configuration (partition_by).

required
write_config WriteConfig | None

Write mode and merge parameters. Defaults to overwrite when None.

required
target_path str

Table alias or physical path for the Delta table.

required

Returns:

Type Description
int

Number of rows in df (rows written for overwrite/append; input rows for merge).

Raises:

Type Description
ExecutionError

If the write operation fails.

__getattr__(name)