Add Validation Rules¶
Goal: Define pre-write validation rules on a thread to catch data quality issues before they reach the target table. Configure severity levels to control whether bad rows are logged, quarantined, or cause the thread to abort.
Prerequisites¶
- An existing thread YAML with sources, steps, and a target
- Understanding of Spark SQL expression syntax (rules are evaluated as boolean expressions)
Severity levels¶
Each validation rule has a severity that determines what happens when rows fail:
| Severity | Behavior |
|---|---|
info |
Logged in telemetry. No effect on data flow. |
warn |
Logged with elevated visibility. No effect on data flow. |
error |
Failing rows are quarantined to {target}_quarantine. Clean rows proceed. |
fatal |
Any failure aborts the thread. No data is written. |
Step 1 -- Add validation rules to a thread¶
Add a validations section to your thread YAML. Each rule needs a name, a
rule expression (Spark SQL boolean), and a severity:
config_version: "1.0"
sources:
customers:
type: delta
alias: bronze.customers
steps:
- derive:
columns:
full_name: "concat(first_name, ' ', last_name)"
validations:
- name: email_not_null
rule: "email IS NOT NULL"
severity: error
- name: positive_balance
rule: "balance >= 0"
severity: fatal
- name: phone_length
rule: "LENGTH(phone) >= 10"
severity: warn
- name: valid_status
rule: "status IN ('active', 'inactive', 'suspended')"
severity: info
target:
path: Tables/dim_customers
write:
mode: overwrite
Rules are evaluated in a single pass over the DataFrame. Each row is tagged with pass/fail for every rule, then routed based on severity.
Step 2 -- Understand quarantine behavior¶
When a rule with error severity fails, the offending rows are split from the
main DataFrame and written to a quarantine Delta table at
{target_path}_quarantine. The quarantine table includes metadata columns:
| Column | Description |
|---|---|
__rule_name |
Name of the failed validation rule |
__rule_expression |
The Spark SQL expression that was evaluated |
__severity |
Severity level of the rule |
__quarantine_ts |
Timestamp when the row was quarantined |
All original data columns are preserved alongside these metadata columns, making it straightforward to inspect and reprocess quarantined records.
Fatal rules abort early
If any fatal-severity rule has failing rows, the entire thread aborts
immediately. No data is written to the target or quarantine table. Use
fatal for invariants that indicate corrupted or fundamentally invalid
source data.
Step 3 -- Add post-execution assertions¶
Assertions validate outcomes after data is written. They check properties of the final target dataset:
assertions:
- type: row_count
min: 100
max: 1000000
severity: warn
- type: column_not_null
columns:
- customer_id
- email
severity: error
- type: unique
columns:
- customer_id
severity: error
Available assertion types:
row_count-- Verify the target row count falls within boundscolumn_not_null-- Verify specified columns contain no nullsunique-- Verify specified columns form a unique keyexpression-- Evaluate a custom Spark SQL expression
Step 4 -- Verify results via telemetry¶
After execution, inspect the RunResult to see validation outcomes:
from weevr import Context
ctx = Context(spark, "my-project.weevr")
result = ctx.run("staging/stg_customers.thread")
# Check overall status
print(result.status) # "success", "failure", or "partial"
# Access per-rule results through telemetry
if result.telemetry:
for vr in result.telemetry.validation_results:
print(f"{vr.rule_name}: {vr.rows_passed} passed, {vr.rows_failed} failed")
for ar in result.telemetry.assertion_results:
print(f"{ar.assertion_type}: {'PASS' if ar.passed else 'FAIL'} - {ar.details}")
The telemetry object contains rows_quarantined to indicate how many rows were
diverted to the quarantine table, alongside rows_written for the main target.