Operations API¶
The weevr.operations module implements the concrete readers, transformation
steps, writers, audit column injection, export writers, and quarantine
handling that the engine dispatches to. Each pipeline step type maps to
an operation function.
weevr.operations
¶
weevr operations — readers, transforms, hashing, mapping, writers, exports, validation.
__all__ = ['AuditContext', 'apply_target_mapping', 'build_sources_json', 'compute_keys', 'evaluate_assertions', 'inject_audit_columns', 'read_source', 'read_sources', 'resolve_audit_columns', 'resolve_export_path', 'resolve_exports', 'run_pipeline', 'validate_dataframe', 'ValidationOutcome', 'write_export', 'write_quarantine', 'write_target']
module-attribute
¶
_LAZY_ATTRS = {'AuditContext': 'weevr.operations.audit', 'build_sources_json': 'weevr.operations.audit', 'inject_audit_columns': 'weevr.operations.audit', 'resolve_audit_columns': 'weevr.operations.audit', 'evaluate_assertions': 'weevr.operations.assertions', 'resolve_export_path': 'weevr.operations.exports', 'resolve_exports': 'weevr.operations.exports', 'write_export': 'weevr.operations.exports', 'compute_keys': 'weevr.operations.hashing', 'run_pipeline': 'weevr.operations.pipeline', 'write_quarantine': 'weevr.operations.quarantine', 'read_source': 'weevr.operations.readers', 'read_sources': 'weevr.operations.readers', 'ValidationOutcome': 'weevr.operations.validation', 'validate_dataframe': 'weevr.operations.validation', 'apply_target_mapping': 'weevr.operations.writers', 'write_target': 'weevr.operations.writers'}
module-attribute
¶
AuditContext
dataclass
¶
Execution context for resolving audit column variables.
Attributes:
| Name | Type | Description |
|---|---|---|
thread_name |
str
|
Thread name (e.g. |
thread_qualified_key |
str
|
Fully qualified key (e.g. |
thread_source |
str | None
|
Primary source alias (first declared source). |
thread_sources_json |
str
|
JSON array of all sources with type metadata. |
weave_name |
str
|
Weave name (e.g. |
loom_name |
str
|
Loom name (e.g. |
run_timestamp |
str
|
ISO 8601 UTC timestamp of execution start. |
run_id |
str
|
UUID4 identifier unique per execution. |
ValidationOutcome
¶
Result of validating a DataFrame against a set of rules.
Attributes:
| Name | Type | Description |
|---|---|---|
clean_df |
Rows that passed all error-severity rules. |
|
quarantine_df |
Rows that failed at least one error-severity rule, exploded by failed rule with metadata columns. None when no error-severity rules exist or when a fatal rule triggered. |
|
validation_results |
Per-rule pass/fail counts. |
|
has_fatal |
True if any fatal-severity rule had failures. |
__init__(clean_df, quarantine_df, validation_results, has_fatal)
¶
Initialize with clean/quarantine splits and validation results.
evaluate_assertions(spark, assertions, target_path)
¶
Evaluate post-write assertions against a Delta target table.
Supports four assertion types: row_count, column_not_null, unique, and expression. Each assertion produces an AssertionResult with pass/fail status, details, and the configured severity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
assertions
|
list[Assertion]
|
List of assertion definitions to evaluate. |
required |
target_path
|
str
|
Path to the Delta table to assert against. |
required |
Returns:
| Type | Description |
|---|---|
list[AssertionResult]
|
List of AssertionResult, one per input assertion. |
build_sources_json(sources)
¶
Build the ${thread.sources} JSON array from thread source definitions.
Each source entry includes name, alias (if available), and type classification (primary for the first source, lookup for lookup references, secondary for all others).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sources
|
Mapping[str, object]
|
Thread source definitions keyed by alias. |
required |
Returns:
| Type | Description |
|---|---|
str
|
JSON string array of source metadata. |
inject_audit_columns(df, audit_columns, context)
¶
Resolve context variables, validate, and inject audit columns.
Each audit column expression is first processed for context variable
substitution, then evaluated as a Spark SQL expression via F.expr().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input DataFrame. |
required |
audit_columns
|
dict[str, str]
|
Audit column definitions as |
required |
context
|
AuditContext
|
Execution context for variable resolution. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
DataFrame with audit columns appended. |
Raises:
| Type | Description |
|---|---|
ExecutionError
|
If an audit column name conflicts with an existing DataFrame column. |
resolve_audit_columns(loom_audit, weave_audit, thread_audit)
¶
Merge audit columns additively: loom -> weave -> thread.
Lower levels extend the column set. Same-named columns at a lower level override the expression from a higher level.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
loom_audit
|
dict[str, str] | None
|
Audit columns from loom defaults. |
required |
weave_audit
|
dict[str, str] | None
|
Audit columns from weave defaults. |
required |
thread_audit
|
dict[str, str] | None
|
Audit columns from thread target. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, str]
|
Merged audit column dict. Empty dict if none defined. |
resolve_export_path(path, context)
¶
Substitute context variables in an export path string.
Delegates to the shared resolve_context_variables() in the audit
module, which handles ${thread.*}, ${weave.*}, ${loom.*},
and ${run.*} namespaces.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Export path with variable placeholders. |
required |
context
|
AuditContext
|
Execution context providing variable values. |
required |
Returns:
| Type | Description |
|---|---|
str
|
Path with all context variables resolved. Unknown variables |
str
|
are left as-is. |
resolve_exports(exports, context)
¶
Resolve context variables in export path fields.
Creates new Export instances with resolved paths. Exports using
alias (no path) are returned unchanged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exports
|
list[Export]
|
Export definitions with potential variable placeholders. |
required |
context
|
AuditContext
|
Execution context for variable resolution. |
required |
Returns:
| Type | Description |
|---|---|
list[Export]
|
List of exports with resolved paths. |
write_export(spark, df, export, *, row_count=None)
¶
Write a DataFrame to an export target.
Dispatches to the appropriate Spark format writer based on the export type. Captures timing, row count, and any errors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
df
|
DataFrame
|
DataFrame to write (post-mapping, audit-injected). |
required |
export
|
Export
|
Export configuration with resolved path. |
required |
row_count
|
int | None
|
Pre-computed row count to avoid an extra |
None
|
Returns:
| Type | Description |
|---|---|
ExportResult
|
ExportResult with write metrics and status. |
compute_keys(df, keys)
¶
Apply key and hash computations declared in a :class:KeyConfig.
Steps are applied in this order: 1. Validate business key columns exist. 2. Compute surrogate key (if configured). 3. Compute change detection hash (if configured).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input DataFrame. |
required |
keys
|
KeyConfig
|
Key configuration — business key columns, surrogate key spec, change detection spec. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
DataFrame with surrogate key and/or change detection hash columns appended. |
Raises:
| Type | Description |
|---|---|
ExecutionError
|
If any declared business key column is missing from |
run_pipeline(df, steps, sources, column_sets=None, column_set_defs=None, lookups=None)
¶
Execute a sequence of pipeline steps against a working DataFrame.
Steps are applied in order. Join and union steps may reference other
loaded source DataFrames via sources. Any Spark exception raised
during a step is wrapped in :class:~weevr.errors.exceptions.ExecutionError
with step index and type context attached.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Initial working DataFrame (typically the primary source). |
required |
steps
|
list[Step]
|
Ordered list of pipeline step configurations. |
required |
sources
|
dict[str, DataFrame]
|
All loaded source DataFrames, keyed by alias. Required for join and union steps that reference secondary sources. |
required |
column_sets
|
dict[str, dict[str, str]] | None
|
Pre-resolved column set mappings keyed by name. When a
rename step references a column set, the resolved dict is looked up
here and passed to |
None
|
column_set_defs
|
dict[str, ColumnSet] | None
|
Column set model instances keyed by name. Used to read
|
None
|
lookups
|
dict[str, DataFrame] | None
|
Cached lookup DataFrames keyed by name. Required for resolve steps that reference named lookups. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
Final DataFrame after all steps have been applied. |
Raises:
| Type | Description |
|---|---|
ExecutionError
|
If any step fails, with step_index and step_type set. |
write_quarantine(spark, quarantine_df, target_path)
¶
Write quarantine rows to a Delta table at {target_path}_quarantine.
Writes using overwrite mode for idempotent re-execution. Skips the write entirely if quarantine_df is None or empty.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
quarantine_df
|
DataFrame | None
|
DataFrame with quarantine metadata columns (__rule_name, __rule_expression, __severity, __quarantine_ts). May be None if no rows need quarantining. |
required |
target_path
|
str
|
Path of the original target Delta table. The quarantine
table is written to |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of rows written, or 0 if no write occurred. |
read_source(spark, alias, source, connections=None)
¶
Read a single source into a DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
alias
|
str
|
Logical name for this source (used in error messages). |
required |
source
|
Source
|
Source configuration. |
required |
connections
|
dict[str, OneLakeConnection] | None
|
Named connection declarations keyed by connection name.
Required when |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
DataFrame with source data, with dedup applied if configured. |
Raises:
| Type | Description |
|---|---|
ExecutionError
|
If the source cannot be read or the type is unsupported. |
read_sources(spark, sources, connections=None)
¶
Read all sources into a mapping of alias -> DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
sources
|
dict[str, Source]
|
Mapping of alias -> Source config. |
required |
connections
|
dict[str, OneLakeConnection] | None
|
Named connection declarations forwarded to each read. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, DataFrame]
|
Mapping of alias -> DataFrame, in the same key order as |
validate_dataframe(df, rules)
¶
Evaluate all validation rules in a single pass with severity routing.
Rules are evaluated as boolean Spark SQL expressions. Each row is tagged with pass/fail per rule, then routed based on severity: - info/warn: logged only, rows stay in clean_df - error: rows moved to quarantine_df (one row per failed rule) - fatal: has_fatal=True, no split performed (caller should abort)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input DataFrame to validate. |
required |
rules
|
list[ValidationRule]
|
Validation rules to evaluate. |
required |
Returns:
| Type | Description |
|---|---|
ValidationOutcome
|
ValidationOutcome with clean/quarantine split and per-rule results. |
apply_target_mapping(df, target, spark)
¶
Apply target column mapping to shape the DataFrame for writing.
Column-level transformations (expr, type, default, drop) are applied first. Then the column set is narrowed according to the mapping mode:
- auto: if the target table already exists, keep only columns present in its schema. If it does not yet exist, all columns pass through.
- explicit: keep only columns explicitly declared in
target.columns(drop columns excluded).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input DataFrame. |
required |
target
|
Target
|
Target configuration including mapping mode and column specs. |
required |
spark
|
SparkSession
|
Active SparkSession (used to probe existing target schema). |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
DataFrame shaped for writing to the target. |
write_target(spark, df, target, write_config, target_path)
¶
Write a DataFrame to a Delta table.
If target_path is a table alias (e.g. staging.stg_customers), the
write uses saveAsTable so the metastore manages the table location.
Otherwise, save writes directly to the file path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
Active SparkSession. |
required |
df
|
DataFrame
|
DataFrame to write. |
required |
target
|
Target
|
Target configuration (partition_by). |
required |
write_config
|
WriteConfig | None
|
Write mode and merge parameters. Defaults to overwrite when None. |
required |
target_path
|
str
|
Table alias or physical path for the Delta table. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of rows in |
Raises:
| Type | Description |
|---|---|
ExecutionError
|
If the write operation fails. |