Skip to content

Thread YAML Schema

A thread is the smallest unit of work in weevr. It defines one or more data sources, an ordered sequence of transformation steps, and a single output target. This page documents every key accepted inside a thread YAML file.


Top-level keys

Key Type Required Default Description
config_version string yes -- Schema version identifier (e.g. "1")
name string no "" Human-readable thread name. Typically set by the weave that references this thread.
sources dict[string, Source] yes -- Named data sources keyed by alias
steps list[Step] no [] Ordered transformation pipeline
target Target yes -- Output destination
write WriteConfig no null Write mode and merge settings
keys KeyConfig no null Business key, surrogate key, and change detection
validations list[ValidationRule] no null Pre-write data quality rules
assertions list[Assertion] no null Post-execution assertions on the target
load LoadConfig no null Incremental load and watermark settings
tags list[string] no null Free-form tags for filtering and organization
params dict[string, ParamSpec] no null Typed parameter declarations
defaults dict[string, any] no null Default values inherited by nested structures
failure FailureConfig no null Per-thread failure handling policy
execution ExecutionConfig no null Runtime settings (logging, tracing)
cache bool no null Whether to cache the final DataFrame before writing

sources

Each key in sources is a logical alias used to reference the source in subsequent steps (e.g. in join, union). The value is a Source object.

Key Type Required Default Description
type string yes -- Source type: "delta", "csv", "json", "parquet", "excel"
alias string no null Lakehouse table alias. Required when type is "delta".
path string no null File path. Required for file-based types (csv, json, parquet, excel).
options dict[string, any] no {} Reader options passed to Spark (e.g. header, delimiter)
dedup DedupConfig no null Deduplication applied immediately after reading

sources.dedup

Key Type Required Default Description
keys list[string] yes -- Columns used for deduplication grouping
order_by string no null Sort expression to determine which row to keep
sources:
  orders:
    type: delta
    alias: raw_orders
    dedup:
      keys: [order_id]
      order_by: "updated_at DESC"
  regions:
    type: csv
    path: /mnt/data/regions.csv
    options:
      header: "true"
      delimiter: ","

steps

An ordered list of transformation steps. Each step is a single-key object where the key identifies the step type. weevr supports 19 step types.

filter

Filter rows using a Spark SQL expression.

Key Type Required Default Description
expr SparkExpr yes -- Boolean Spark SQL expression
- filter:
    expr: "amount > 0 AND status != 'cancelled'"

derive

Create one or more new columns from Spark SQL expressions.

Key Type Required Default Description
columns dict[string, SparkExpr] yes -- Map of output column name to expression
- derive:
    columns:
      total_amount: "quantity * unit_price"
      order_year: "year(order_date)"

join

Join the current DataFrame with another named source.

Key Type Required Default Description
source string yes -- Name of the source to join with
type string no "inner" Join type: inner, left, right, full, cross, semi, anti
on list[string or JoinKeyPair] yes -- Join keys. Strings are treated as same-name keys on both sides.
null_safe bool no true Use null-safe equality for join conditions

A JoinKeyPair has left and right fields for asymmetric key names.

- join:
    source: regions
    type: left
    on:
      - region_id
      - { left: src_code, right: region_code }

select

Keep only the listed columns.

Key Type Required Default Description
columns list[string] yes -- Column names to retain
- select:
    columns: [order_id, customer_id, total_amount, region_name]

drop

Remove columns from the DataFrame.

Key Type Required Default Description
columns list[string] yes -- Column names to drop
- drop:
    columns: [temp_flag, _raw_payload]

rename

Rename columns.

Key Type Required Default Description
columns dict[string, string] yes -- Map of old name to new name
- rename:
    columns:
      cust_id: customer_id
      amt: amount

cast

Change column data types.

Key Type Required Default Description
columns dict[string, string] yes -- Map of column name to Spark data type
- cast:
    columns:
      amount: decimal(18,2)
      order_date: date

dedup

Deduplicate rows by key columns.

Key Type Required Default Description
keys list[string] yes -- Columns to group by for deduplication
order_by string no null Sort expression to determine row ordering
keep string no "last" Which row to keep: "first" or "last"
- dedup:
    keys: [order_id]
    order_by: "updated_at"
    keep: last

sort

Sort rows.

Key Type Required Default Description
columns list[string] yes -- Columns to sort by
ascending bool no true Sort direction
- sort:
    columns: [region_name, order_date]
    ascending: false

union

Union the current DataFrame with one or more other sources.

Key Type Required Default Description
sources list[string] yes -- Source aliases to union with
mode string no "by_name" Union mode: "by_name" or "by_position"
allow_missing bool no false Allow missing columns (filled with nulls) in by_name mode
- union:
    sources: [archived_orders, pending_orders]
    mode: by_name
    allow_missing: true

aggregate

Group and aggregate rows.

Key Type Required Default Description
group_by list[string] no null Grouping columns. Omit for whole-DataFrame aggregation.
measures dict[string, SparkExpr] yes -- Map of output alias to aggregate expression. Must not be empty.
- aggregate:
    group_by: [region, product_category]
    measures:
      total_revenue: "sum(amount)"
      order_count: "count(*)"
      avg_price: "avg(unit_price)"

window

Apply window functions over a partition specification.

Key Type Required Default Description
functions dict[string, SparkExpr] yes -- Map of output column to window expression. Must not be empty.
partition_by list[string] yes -- Partition columns. Must not be empty.
order_by list[string] no null Ordering within each partition
frame WindowFrame no null Frame specification

A WindowFrame object has:

Key Type Required Default Description
type string yes -- "rows" or "range"
start int yes -- Frame start offset (use negative for preceding)
end int yes -- Frame end offset. Must be >= start.
- window:
    functions:
      row_num: "row_number()"
      running_total: "sum(amount)"
    partition_by: [customer_id]
    order_by: [order_date]
    frame:
      type: rows
      start: -2
      end: 0

pivot

Pivot rows into columns.

Key Type Required Default Description
group_by list[string] yes -- Grouping columns
pivot_column string yes -- Column whose distinct values become output columns
values list[string\|int\|float\|bool] yes -- Explicit pivot values. Must not be empty.
aggregate SparkExpr yes -- Aggregate expression applied per cell
- pivot:
    group_by: [customer_id]
    pivot_column: quarter
    values: ["Q1", "Q2", "Q3", "Q4"]
    aggregate: "sum(revenue)"

unpivot

Unpivot columns into rows.

Key Type Required Default Description
columns list[string] yes -- Columns to unpivot. Must not be empty.
name_column string yes -- Name for the output column containing original column names
value_column string yes -- Name for the output column containing values. Must differ from name_column.
- unpivot:
    columns: [jan_sales, feb_sales, mar_sales]
    name_column: month
    value_column: sales_amount

case_when

Create a column with conditional logic (SQL CASE WHEN equivalent).

Key Type Required Default Description
column string yes -- Output column name
cases list[CaseWhenBranch] yes -- Ordered list of when/then pairs. Must not be empty.
otherwise SparkExpr no null Default value when no condition matches

Each CaseWhenBranch has:

Key Type Required Default Description
when SparkExpr yes -- Boolean condition
then SparkExpr yes -- Value when condition is true
- case_when:
    column: customer_tier
    cases:
      - when: "lifetime_value > 10000"
        then: "'platinum'"
      - when: "lifetime_value > 5000"
        then: "'gold'"
      - when: "lifetime_value > 1000"
        then: "'silver'"
    otherwise: "'bronze'"

fill_null

Replace null values in specified columns.

Key Type Required Default Description
columns dict[string, any] yes -- Map of column name to fill value. Must not be empty.
- fill_null:
    columns:
      discount: 0
      notes: "N/A"
      is_active: true

coalesce

Return the first non-null value from an ordered list of source columns.

Key Type Required Default Description
columns dict[string, list[string]] yes -- Map of output column to ordered list of source columns. Both the map and each source list must not be empty.
- coalesce:
    columns:
      email: [work_email, personal_email, backup_email]
      phone: [mobile, home_phone]

string_ops

Apply a Spark SQL expression template across selected string columns. The {col} placeholder is replaced with each column name.

Key Type Required Default Description
columns list[string] yes -- Columns to transform
expr string yes -- Expression template containing {col} placeholder
on_empty string no "warn" Behavior when column list is empty: "warn" or "error"
- string_ops:
    columns: [first_name, last_name, city]
    expr: "trim(upper({col}))"

date_ops

Apply a Spark SQL expression template across selected date or timestamp columns. The {col} placeholder is replaced with each column name.

Key Type Required Default Description
columns list[string] yes -- Columns to transform
expr string yes -- Expression template containing {col} placeholder
on_empty string no "warn" Behavior when column list is empty: "warn" or "error"
- date_ops:
    columns: [created_at, updated_at]
    expr: "to_date({col})"

target

Defines where the thread writes its output.

Key Type Required Default Description
alias string no null Lakehouse table alias for Delta targets
path string no null File path for file-based targets
mapping_mode string no "auto" Column mapping: "auto" (pass-through) or "explicit" (only mapped columns)
columns dict[string, ColumnMapping] no null Per-column mapping specifications
partition_by list[string] no null Partition columns for the output table
audit_template string no null Name of an audit column template to apply
naming NamingConfig no null Column and table naming normalization

target.columns (ColumnMapping)

Key Type Required Default Description
expr SparkExpr no null Spark SQL expression for the column value. Mutually exclusive with drop.
type string no null Target data type (cast applied on write)
default any no null Default value when the source column is null
drop bool no false Drop this column from output. Mutually exclusive with expr.

target.naming (NamingConfig)

Key Type Required Default Description
columns NamingPattern no null Pattern for column names
tables NamingPattern no null Pattern for table names
exclude list[string] no [] Column names or glob patterns to exclude from normalization

Supported patterns: snake_case, camelCase, PascalCase, UPPER_SNAKE_CASE, Title_Snake_Case, Title Case, lowercase, UPPERCASE, none.

target:
  alias: curated_orders
  partition_by: [order_year]
  mapping_mode: auto
  columns:
    total_amount:
      type: "decimal(18,2)"
    _internal_flag:
      drop: true
  naming:
    columns: snake_case

write

Controls how data is written to the target.

Key Type Required Default Description
mode string no "overwrite" Write mode: "overwrite", "append", "merge"
match_keys list[string] no null Match keys for merge mode. Required when mode is "merge".
on_match string no "update" Merge behavior when a match is found: "update" or "ignore"
on_no_match_target string no "insert" Merge behavior for new source rows: "insert" or "ignore"
on_no_match_source string no "ignore" Merge behavior for missing source rows: "delete", "soft_delete", "ignore"
soft_delete_column string no null Column to flag soft deletes. Required when on_no_match_source is "soft_delete".
soft_delete_value string no "true" Value written to the soft delete column
write:
  mode: merge
  match_keys: [order_id]
  on_match: update
  on_no_match_target: insert
  on_no_match_source: soft_delete
  soft_delete_column: is_deleted
  soft_delete_value: "true"

keys

Key management for business keys, surrogate keys, and change detection hashes.

Key Type Required Default Description
business_key list[string] no null Columns forming the natural business key
surrogate_key SurrogateKeyConfig no null Surrogate key generation settings
change_detection ChangeDetectionConfig no null Change detection hash settings

keys.surrogate_key

Key Type Required Default Description
name string yes -- Output column name for the surrogate key
algorithm string no "sha256" Hash algorithm: "sha256" or "md5"

keys.change_detection

Key Type Required Default Description
name string yes -- Output column name for the change hash
columns list[string] yes -- Columns included in the hash
algorithm string no "md5" Hash algorithm: "md5" or "sha256"
keys:
  business_key: [order_id]
  surrogate_key:
    name: sk_order
    algorithm: sha256
  change_detection:
    name: change_hash
    columns: [status, amount, updated_at]
    algorithm: md5

load

Incremental load configuration with watermark tracking.

Key Type Required Default Description
mode string no "full" Load mode: "full", "incremental_watermark", "incremental_parameter", "cdc"
watermark_column string no null Column for watermark comparison. Required for incremental_watermark. Must not be set for cdc.
watermark_type string no null Data type of the watermark column: "timestamp", "date", "int", "long"
watermark_inclusive bool no false Include rows equal to the last watermark value (use with merge/overwrite for idempotency)
watermark_store WatermarkStoreConfig no null Watermark persistence backend
cdc CdcConfig no null CDC configuration. Required when mode is "cdc".

load.watermark_store

Key Type Required Default Description
type string no "table_properties" Store type: "table_properties" (zero-config) or "metadata_table"
table_path string no null Path to metadata table. Required for "metadata_table" type.

load.cdc

Key Type Required Default Description
preset string no null Preset name: "delta_cdf". Mutually exclusive with operation_column.
operation_column string no null Column containing operation type. Mutually exclusive with preset.
insert_value string no null Value indicating an insert operation
update_value string no null Value indicating an update operation
delete_value string no null Value indicating a delete operation
on_delete string no "hard_delete" Delete behavior: "hard_delete" or "soft_delete"

Either preset or operation_column must be set (but not both). When using explicit mapping, at least insert_value or update_value is required.

load:
  mode: incremental_watermark
  watermark_column: updated_at
  watermark_type: timestamp
  watermark_store:
    type: table_properties
load:
  mode: cdc
  cdc:
    preset: delta_cdf

validations

Pre-write data quality rules. Each rule is evaluated as a Spark SQL boolean expression against the DataFrame before it is written.

Key Type Required Default Description
name string yes -- Human-readable rule name
rule SparkExpr yes -- Spark SQL boolean expression
severity string no "error" Severity level: "info", "warn", "error", "fatal"
validations:
  - name: positive_amount
    rule: "amount > 0"
    severity: error
  - name: valid_status
    rule: "status IN ('active', 'inactive', 'pending')"
    severity: warn

assertions

Post-execution assertions evaluated against the target dataset after writing.

Key Type Required Default Description
type string yes -- Assertion type: "row_count", "column_not_null", "unique", "expression"
severity string no "warn" Severity level: "info", "warn", "error", "fatal"
columns list[string] no null Columns for column_not_null and unique assertions
min int no null Minimum value for row_count assertions
max int no null Maximum value for row_count assertions
expression SparkExpr no null Spark SQL expression for expression assertions
assertions:
  - type: row_count
    min: 1
    severity: error
  - type: column_not_null
    columns: [order_id, customer_id]
    severity: fatal
  - type: unique
    columns: [order_id]
  - type: expression
    expression: "count(CASE WHEN amount < 0 THEN 1 END) = 0"
    severity: warn

failure

Per-thread failure handling policy. Controls what happens to remaining threads in a weave when this thread fails.

Key Type Required Default Description
on_failure string no "abort_weave" Policy: "abort_weave", "skip_downstream", "continue"
  • abort_weave -- Stop all remaining threads in the weave immediately.
  • skip_downstream -- Skip only threads that depend on the failed thread.
  • continue -- Continue executing unrelated threads.
failure:
  on_failure: skip_downstream

execution

Runtime execution settings. These cascade from loom to weave to thread, with the most specific level winning.

Key Type Required Default Description
log_level string no "standard" Logging verbosity: "minimal", "standard", "verbose", "debug"
trace bool no true Collect execution spans for telemetry
execution:
  log_level: verbose
  trace: true

params

Typed parameter declarations. Parameters can be referenced in expressions using ${param.name} syntax.

Each entry maps a parameter name to a ParamSpec:

Key Type Required Default Description
name string yes -- Parameter name
type string yes -- Data type: "string", "int", "float", "bool", "date", "timestamp", "list[string]"
required bool no true Whether the parameter must be supplied at runtime
default any no null Default value when not supplied
description string no "" Human-readable description
params:
  start_date:
    name: start_date
    type: date
    required: true
    description: "Start of the reporting window"
  region_filter:
    name: region_filter
    type: string
    required: false
    default: "ALL"

Complete example

config_version: "1"

sources:
  orders:
    type: delta
    alias: raw_orders
    dedup:
      keys: [order_id]
      order_by: "updated_at DESC"
  customers:
    type: delta
    alias: dim_customers

steps:
  - filter:
      expr: "status != 'cancelled'"
  - join:
      source: customers
      type: left
      on: [customer_id]
  - derive:
      columns:
        total_amount: "quantity * unit_price"
        order_year: "year(order_date)"
  - select:
      columns:
        - order_id
        - customer_id
        - customer_name
        - total_amount
        - order_year
        - updated_at

target:
  alias: curated_orders
  partition_by: [order_year]
  naming:
    columns: snake_case

write:
  mode: merge
  match_keys: [order_id]
  on_match: update
  on_no_match_target: insert

keys:
  business_key: [order_id]
  surrogate_key:
    name: sk_order
  change_detection:
    name: change_hash
    columns: [total_amount, customer_name]

load:
  mode: incremental_watermark
  watermark_column: updated_at
  watermark_type: timestamp

validations:
  - name: positive_total
    rule: "total_amount >= 0"
    severity: error

assertions:
  - type: row_count
    min: 1
    severity: error
  - type: unique
    columns: [order_id]

failure:
  on_failure: skip_downstream

execution:
  log_level: standard
  trace: true

tags: [orders, curated]