Thread YAML Schema¶
A thread is the smallest unit of work in weevr. It defines one or more data sources, an ordered sequence of transformation steps, and a single output target. This page documents every key accepted inside a thread YAML file.
Threads can be used as parameterized templates via as and params on
ThreadEntry in a weave. See the
Thread Templates guide for usage.
Top-level keys¶
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
config_version |
string |
yes | -- | Schema version identifier (e.g. "1.0") |
name |
string |
no | "" |
Human-readable thread name. Typically set by the weave that references this thread. |
sources |
dict[string, Source] |
yes | -- | Named data sources keyed by alias. Must contain at least one entry. |
with |
dict[string, SubPipeline] |
no | null |
Named sub-pipelines (CTEs) resolved before the main steps pipeline. |
steps |
list[Step] |
no | [] |
Ordered transformation pipeline |
target |
Target |
yes | -- | Output destination |
write |
WriteConfig |
no | null |
Write mode and merge settings |
keys |
KeyConfig |
no | null |
Business key, surrogate key, and change detection |
validations |
list[ValidationRule] |
no | null |
Pre-write data quality rules |
assertions |
list[Assertion] |
no | null |
Post-execution assertions on the target |
load |
LoadConfig |
no | null |
Incremental load and watermark settings |
tags |
list[string] |
no | null |
Free-form tags for filtering and organization |
params |
dict[string, ParamSpec] |
no | null |
Typed parameter declarations |
defaults |
dict[string, any] |
no | null |
Default values inherited by nested structures |
failure |
FailureConfig |
no | null |
Per-thread failure handling policy |
execution |
ExecutionConfig |
no | null |
Runtime settings (logging, tracing) |
cache |
bool |
no | null |
Whether to cache the final DataFrame before writing |
exports |
list[Export] |
no | null |
Secondary output destinations. See Exports guide. |
lookups |
dict[string, Lookup] |
no | null |
Thread-level lookup definitions. Merged with weave-level lookups (thread wins on name collision). |
column_sets |
dict[string, ColumnSet] |
no | null |
Thread-level column set definitions for rename steps. Merged with weave-level column sets (thread wins). |
variables |
dict[string, VariableSpec] |
no | null |
Thread-level variable declarations. Resolved before pre_steps execution. |
audit_templates |
dict[string, AuditTemplate] |
no | null |
Named audit column templates available to targets in this thread. Merged with parent-level definitions (thread wins on name collision). A flat dict[string, string] shorthand is also accepted. |
pre_steps |
list[HookStep] |
no | null |
Hook steps to run before thread core execution. Not cascaded from weave — each level runs its own list. |
post_steps |
list[HookStep] |
no | null |
Hook steps to run after thread core execution. Not cascaded from weave. |
connections |
dict[string, OneLakeConnection] |
no | null |
Named OneLake connection declarations. Merged with weave/loom-level connections (thread wins on name collision). See Connections guide. |
sources¶
Each key in sources is a logical alias used to reference the source in
subsequent steps (e.g. in join, union). The value is a Source object.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
type |
string |
conditional | -- | Source type: "delta", "csv", "json", "parquet", "excel", "date_sequence", "int_sequence". Required when neither lookup nor connection is set. |
lookup |
string |
conditional | null |
Lookup name. Mutually exclusive with type. Resolved from the nearest lookups map (thread, weave, or loom level) at execution time. |
alias |
string |
conditional | null |
Lakehouse table alias. Required when type is "delta" without a connection. Mutually exclusive with connection. |
path |
string |
conditional | null |
File path. Required for file-based types (csv, json, parquet, excel). |
options |
dict[string, any] |
no | {} |
Reader options passed to Spark (e.g. header, delimiter) |
dedup |
DedupConfig |
no | null |
Deduplication applied immediately after reading |
connection |
string |
conditional | null |
Name of a connection declared in connections:. Requires table. Mutually exclusive with alias. |
schema |
string |
no | null |
Schema override within the connection's lakehouse. Overrides connection.default_schema. |
table |
string |
conditional | null |
Table name within the connection's lakehouse. Required when connection is set. |
start |
string | int |
conditional | -- | Range start (inclusive). Required for generated types (date_sequence, int_sequence). |
end |
string | int |
conditional | -- | Range end (inclusive). Required for generated types (date_sequence, int_sequence). |
column |
string |
conditional | -- | Output column name. Required for generated types (date_sequence, int_sequence). |
step |
string | int |
no | "day" / 1 |
Step interval. For date_sequence: "day", "week", "month", or "year". For int_sequence: a positive integer. |
sources.dedup¶
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
keys |
list[string] |
yes | -- | Columns used for deduplication grouping |
order_by |
string |
no | null |
Sort expression to determine which row to keep |
sources:
orders:
type: delta
alias: raw_orders
dedup:
keys: [order_id]
order_by: "updated_at DESC"
regions:
type: csv
path: /mnt/data/regions.csv
options:
header: "true"
delimiter: ","
products:
lookup: dim_product # resolved from the weave's lookups map
Generated Sources¶
Generated sources produce synthetic data at runtime — no lakehouse read or file path is needed. They are useful for building calendar spines, surrogate key ranges, or any sequence-based reference table.
Two generated types are available: date_sequence and int_sequence. Both
produce a single-column DataFrame covering a closed range [start, end].
Mutual exclusivity: generated types (date_sequence, int_sequence) are
incompatible with alias, path, options, dedup, connection, and
lookup. Setting any of those keys alongside a generated type raises a
validation error.
date_sequence¶
Produces a single DateType column containing one row per calendar interval
from start to end (both inclusive).
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
type |
string |
yes | -- | Must be "date_sequence" |
start |
string |
yes | -- | Start date, inclusive. ISO-8601 format ("YYYY-MM-DD"). |
end |
string |
yes | -- | End date, inclusive. ISO-8601 format ("YYYY-MM-DD"). |
column |
string |
yes | -- | Name of the output column. |
step |
string |
no | "day" |
Step interval: "day", "week", "month", or "year". |
int_sequence¶
Produces a single LongType column containing one row per integer from start
to end (both inclusive).
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
type |
string |
yes | -- | Must be "int_sequence" |
start |
int |
yes | -- | Start value, inclusive. |
end |
int |
yes | -- | End value, inclusive. |
column |
string |
yes | -- | Name of the output column. |
step |
int |
no | 1 |
Increment between values. Must be a positive integer. |
with¶
Named sub-pipelines that produce intermediate DataFrames before the main
steps pipeline runs. Each entry defines a CTE-like named DataFrame that
downstream steps (and other CTEs) can reference as a source.
Each key in with is a logical CTE name. The value is a SubPipeline
object:
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
from |
string |
yes | -- | Source alias or earlier CTE name to start the sub-pipeline from |
steps |
list[Step] |
yes | -- | Ordered pipeline steps applied to the source DataFrame |
Semantics:
- Declaration order matters. A CTE may only reference sources or CTEs declared before it — forward references are not allowed.
- CTE names share the source namespace. A CTE name must not collide with
any key in
sources. - CTEs can chain: a later CTE's
from:may name an earlier CTE. - Unused CTEs (not referenced by any join, union, or the main
stepspipeline) produce a validation warning at parse time.
sources:
invoices:
type: delta
alias: raw.invoices
invoice_lines:
type: delta
alias: raw.invoice_lines
plants:
type: delta
alias: raw.plants
with:
# Pre-aggregate invoice lines before joining to invoice header
agg_lines:
from: invoice_lines
steps:
- filter:
expr: "line_status = 'POSTED'"
- aggregate:
group_by: [invoice_id]
measures:
net_amount: "sum(net_value)"
line_count: "count(*)"
# Enrich plants with region mapping
plants_enriched:
from: plants
steps:
- filter:
expr: "plant_active = true"
- derive:
columns:
region_code: "substring(plant_id, 1, 2)"
steps:
- join:
source: agg_lines # reference the CTE above
type: left
on: [invoice_id]
- join:
source: plants_enriched # reference the second CTE
type: left
on: [plant_id]
steps¶
An ordered list of transformation steps. Each step is a single-key object
where the key identifies the step type. weevr supports 23 step types.
In addition, foreach macro blocks are accepted and expanded
before model hydration.
filter¶
Filter rows using a Spark SQL expression.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
expr |
SparkExpr |
yes | -- | Boolean Spark SQL expression |
derive¶
Create one or more new columns from Spark SQL expressions.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
columns |
dict[string, SparkExpr] |
yes | -- | Map of output column name to expression |
join¶
Join the current DataFrame with another named source.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
source |
string |
yes | -- | Name of the source to join with |
type |
string |
no | "inner" |
Join type: inner, left, right, full, cross, semi, anti |
on |
list[string or JoinKeyPair] |
yes | -- | Join keys. Strings are treated as same-name keys on both sides. |
null_safe |
bool |
no | true |
Use null-safe equality for join conditions |
alias |
string |
no | null |
DataFrame alias applied to the right-side source before joining. Resolves column name ambiguity via Spark .alias(). |
filter |
string |
no | null |
Spark SQL predicate applied to the right-side DataFrame before the join. Reduces shuffle size for selective joins. |
include |
list[string] |
no | null |
Glob patterns controlling which right-side columns survive into the result. Applied after duplicate key column removal. |
exclude |
list[string] |
no | null |
Glob patterns for right-side columns to drop from the result. Applied after include. |
rename |
dict[string, string] |
no | null |
Rename right-side columns after join. Map of old name to new name. Applied after prefix. |
prefix |
string |
no | null |
String prepended to every surviving right-side column name. Applied before rename. |
A JoinKeyPair has left and right fields for asymmetric key names.
Column processing order (right-side columns only):
- Duplicate join key columns are dropped automatically
includeglob patterns are applied — only matching columns are keptexcludeglob patterns are applied — matching columns are removedprefixis prepended to all surviving column namesrenamemappings are applied last
- join:
source: regions
type: left
on:
- region_id
- { left: src_code, right: region_code }
# With column control
- join:
source: sap_plants
type: left
on: [plant_id]
filter: "plant_status = 'ACTIVE'"
include: ["plant_*", "region_code"]
exclude: ["plant_internal_*"]
prefix: "src_"
rename:
src_plant_name: plant_name
select¶
Keep only the listed columns.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
columns |
list[string] |
yes | -- | Column names to retain |
drop¶
Remove columns from the DataFrame.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
columns |
list[string] |
yes | -- | Column names to drop |
rename¶
Rename columns. Supports both static mappings and named column set
references. When both column_set and columns are specified,
static entries in columns override matching column set entries.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
columns |
dict[string, string] |
no | {} |
Map of old name to new name |
column_set |
string |
no | null |
Name of a column set defined at weave or loom level |
# Static rename only
- rename:
columns:
cust_id: customer_id
amt: amount
# Column set with static overrides
- rename:
column_set: sap_dictionary
columns:
MANUAL_COL: manual_override # static wins
cast¶
Change column data types.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
columns |
dict[string, string] |
yes | -- | Map of column name to Spark data type |
dedup¶
Deduplicate rows by key columns.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
keys |
list[string] |
yes | -- | Columns to group by for deduplication |
order_by |
string |
no | null |
Sort expression to determine row ordering |
keep |
string |
no | "last" |
Which row to keep: "first" or "last" |
sort¶
Sort rows.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
columns |
list[string] |
yes | -- | Columns to sort by |
ascending |
bool |
no | true |
Sort direction |
union¶
Union the current DataFrame with one or more other sources.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
sources |
list[string] |
yes | -- | Source aliases to union with |
mode |
string |
no | "by_name" |
Union mode: "by_name" or "by_position" |
allow_missing |
bool |
no | false |
Allow missing columns (filled with nulls) in by_name mode |
aggregate¶
Group and aggregate rows.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
group_by |
list[string] |
no | null |
Grouping columns. Omit for whole-DataFrame aggregation. |
measures |
dict[string, SparkExpr] |
yes | -- | Map of output alias to aggregate expression. Must not be empty. |
- aggregate:
group_by: [region, product_category]
measures:
total_revenue: "sum(amount)"
order_count: "count(*)"
avg_price: "avg(unit_price)"
window¶
Apply window functions over a partition specification.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
functions |
dict[string, SparkExpr] |
yes | -- | Map of output column to window expression. Must not be empty. |
partition_by |
list[string] |
yes | -- | Partition columns. Must not be empty. |
order_by |
list[string] |
no | null |
Ordering within each partition |
frame |
WindowFrame |
no | null |
Frame specification |
A WindowFrame object has:
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
type |
string |
yes | -- | "rows" or "range" |
start |
int |
yes | -- | Frame start offset (use negative for preceding) |
end |
int |
yes | -- | Frame end offset. Must be >= start. |
- window:
functions:
row_num: "row_number()"
running_total: "sum(amount)"
partition_by: [customer_id]
order_by: [order_date]
frame:
type: rows
start: -2
end: 0
pivot¶
Pivot rows into columns.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
group_by |
list[string] |
yes | -- | Grouping columns |
pivot_column |
string |
yes | -- | Column whose distinct values become output columns |
values |
list[string\|int\|float\|bool] |
yes | -- | Explicit pivot values. Must not be empty. |
aggregate |
SparkExpr |
yes | -- | Aggregate expression applied per cell |
- pivot:
group_by: [customer_id]
pivot_column: quarter
values: ["Q1", "Q2", "Q3", "Q4"]
aggregate: "sum(revenue)"
unpivot¶
Unpivot columns into rows.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
columns |
list[string] |
yes | -- | Columns to unpivot. Must not be empty. |
name_column |
string |
yes | -- | Name for the output column containing original column names |
value_column |
string |
yes | -- | Name for the output column containing values. Must differ from name_column. |
case_when¶
Create a column with conditional logic (SQL CASE WHEN equivalent).
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
column |
string |
yes | -- | Output column name |
cases |
list[CaseWhenBranch] |
yes | -- | Ordered list of when/then pairs. Must not be empty. |
otherwise |
SparkExpr |
no | null |
Default value when no condition matches |
Each CaseWhenBranch has:
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
when |
SparkExpr |
yes | -- | Boolean condition |
then |
SparkExpr |
yes | -- | Value when condition is true |
- case_when:
column: customer_tier
cases:
- when: "lifetime_value > 10000"
then: "'platinum'"
- when: "lifetime_value > 5000"
then: "'gold'"
- when: "lifetime_value > 1000"
then: "'silver'"
otherwise: "'bronze'"
fill_null¶
Replace null values in specified columns. Supports explicit column-to-value mapping and schema-driven type defaults. Both modes may appear in the same step — type defaults apply first, then explicit columns override.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
columns |
dict[string, any] |
cond. | -- | Map of column name to fill value |
mode |
string |
cond. | -- | Set to "type_defaults" for schema-driven fills |
code |
string |
cond. | -- | Semantic code: "unknown", "not_applicable", or "invalid". Required when mode is set. |
include |
list[string] |
no | all | Glob patterns restricting which columns are filled |
exclude |
list[string] |
no | none | Glob patterns excluding columns from fill |
overrides |
dict[string, any] |
no | -- | Per-column overrides for type-based defaults |
where |
string |
no | -- | Spark SQL predicate for conditional fill |
At least one of columns or mode must be set.
Explicit columns:
Type-aware mode:
- fill_null:
mode: type_defaults
code: unknown
exclude: [id, "*_id"]
overrides:
region: "Unspecified"
Type-default mappings:
| Spark type | unknown | not_applicable | invalid |
|---|---|---|---|
| String | "Unknown" |
"Not Applicable" |
"Invalid" |
| Boolean | false |
false |
false |
| Integer/Long | 0 |
0 |
0 |
| Float/Double | 0.0 |
0.0 |
0.0 |
| Decimal | 0 |
0 |
0 |
| Date | 1970-01-01 |
1970-01-01 |
1970-01-01 |
| Timestamp | epoch | epoch | epoch |
Composable (both modes):
Conditional fill:
coalesce¶
Return the first non-null value from an ordered list of source columns.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
columns |
dict[string, list[string]] |
yes | -- | Map of output column to ordered list of source columns. Both the map and each source list must not be empty. |
string_ops¶
Apply a Spark SQL expression template across selected string columns.
The {col} placeholder is replaced with each column name.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
columns |
list[string] |
yes | -- | Columns to transform |
expr |
string |
yes | -- | Expression template containing {col} placeholder |
on_empty |
string |
no | "warn" |
Behavior when column list is empty: "warn" or "error" |
date_ops¶
Apply a Spark SQL expression template across selected date or timestamp
columns. The {col} placeholder is replaced with each column name.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
columns |
list[string] |
yes | -- | Columns to transform |
expr |
string |
yes | -- | Expression template containing {col} placeholder |
on_empty |
string |
no | "warn" |
Behavior when column list is empty: "warn" or "error" |
concat¶
Concatenate multiple columns into a single string column. Null handling, separator, and trimming are configurable.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
target |
string |
yes | -- | Output column name |
columns |
list[string] |
yes | -- | Ordered source columns. Must not be empty. |
separator |
string |
no | "" |
String inserted between column values |
null_mode |
string |
no | "skip" |
How nulls are handled: "skip", "empty", or "literal" |
null_literal |
string |
no | "<NULL>" |
Replacement text when null_mode is "literal" |
trim |
boolean |
no | false |
Trim whitespace from inputs and result |
collapse_separators |
boolean |
no | true |
Collapse adjacent separators. Only applies when null_mode is "skip". |
Non-string columns are auto-cast to string before concatenation.
When all input columns are null or blank, the result is NULL.
- concat:
target: full_address
columns: [street, city, state, zip]
separator: ", "
null_mode: skip
trim: true
map¶
Map discrete values in a column using a lookup dictionary.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
column |
string |
yes | -- | Source column to map |
target |
string |
no | source column | Output column name |
values |
dict[string, any] |
yes | -- | Value mapping dictionary. Must not be empty. |
default |
any |
no | null |
Value for unmapped inputs. Mutually exclusive with unmapped: null\|validate. |
on_null |
any |
no | null |
Value when source is null. Falls back to default if not set. |
unmapped |
string |
no | "keep" |
Behavior for unmapped values: "keep", "null", or "validate" |
case_sensitive |
boolean |
no | true |
Whether value matching is case-sensitive |
Null handling cascade: on_null (if set) then default
(if set) then null passthrough.
- map:
column: status_code
target: status_label
values:
A: Active
I: Inactive
C: Closed
default: Unknown
format¶
Format columns using pattern, number, or date rules. Multiple columns can be formatted in a single step. Each key is a target column name with a format specification.
Format spec keys:
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
source |
string |
no | target name | Source column (in-place when omitted) |
pattern |
string |
cond. | -- | String pattern with {pos:len} placeholders (1-indexed) |
number |
string |
cond. | -- | Java DecimalFormat pattern |
date |
string |
cond. | -- | Java SimpleDateFormat pattern |
on_short |
string |
no | "null" |
Behavior when source is shorter than pattern: "null" or "partial" |
strict_types |
boolean |
no | false |
Reject type mismatches instead of auto-casting |
Exactly one of pattern, number, or date must be set
per column.
- format:
phone:
pattern: "({1:3}){4:3}-{7:4}"
source: raw_phone
amount_display:
number: "#,##0.00"
source: amount
event_date:
date: "yyyy-MM-dd"
resolve¶
Resolve foreign keys by joining to a named lookup dimension. Assigns sentinel values for invalid (null/blank) and unknown (no match) business keys. Supports single FK, compound BK, SCD2 narrowing, batch mode, and include columns.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string |
yes* | -- | Output FK column name |
lookup |
string |
yes* | -- | Named lookup reference |
match |
string, list, or dict |
yes* | -- | BK column mapping (sugar: string/list/dict) |
pk |
string |
yes* | -- | Surrogate key column in lookup |
on_invalid |
int |
no | -4 |
Sentinel for incomplete BK |
on_unknown |
int |
no | -1 |
Sentinel for no match |
on_duplicate |
string |
no | "warn" |
"error", "warn", "first" |
on_failure |
string |
no | "abort" |
"abort", "warn" |
normalize |
string |
no | null |
"trim_lower", "trim_upper", "trim", "none" |
drop_source_columns |
bool |
no | false |
Drop source columns after resolve |
include |
list or dict |
no | null |
Extra columns from lookup. See syntax below. |
include_prefix |
string |
no | null |
Prefix for included columns |
effective |
EffectiveConfig |
no | null |
SCD2 narrowing (date range or current flag) |
where |
string |
no | null |
SQL predicate filter on lookup |
batch |
list[ResolveBatchItem] |
no | null |
Batch FK specs (mutually exclusive with name/lookup/match) |
*Required in single mode. In batch mode, these fields are optional at the top level and serve as shared defaults merged into each batch item.
include syntax:
include accepts three forms:
- String list — column names to carry over from the lookup as-is:
- Dict — map of lookup column to output alias (rename on include):
- Object list — each entry is a
{column, as}pair for explicit per-column renaming in a list form:
include_prefix applies after all three forms and prepends a string
to every included column name. include_prefix and dict/object rename
are mutually exclusive per column.
See the Resolve Step guide for detailed examples of all modes and features.
- resolve:
name: plant_id
lookup: dim_plant
match: plant_code
pk: id
on_invalid: -4
on_unknown: -1
# With include columns
- resolve:
name: customer_id
lookup: dim_customer
match: customer_code
pk: sk_customer
include:
- { column: customer_name, as: cust_name }
- { column: customer_tier, as: tier }
foreach¶
A macro block that expands into repeated step sequences before model hydration. Not a step type itself — expanded by the config preprocessor.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
values |
list |
yes | -- | Items to iterate over |
as |
string |
yes | -- | Variable name bound to each item during expansion |
steps |
list[Step] |
yes | -- | Step template repeated for each value |
steps:
- foreach:
values: [revenue, cost, margin]
as: col
steps:
- derive:
name: "${col}_rounded"
expr: "round(${col}, 2)"
target¶
Defines where the thread writes its output.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
alias |
string |
cond. | null |
Lakehouse table alias for Delta targets. Mutually exclusive with connection. |
path |
string |
cond. | null |
File path for file-based targets. |
connection |
string |
cond. | null |
Name of a connection declared in connections:. Requires table. Mutually exclusive with alias. |
schema |
string |
no | null |
Schema override within the connection's lakehouse. Overrides connection.default_schema. |
table |
string |
cond. | null |
Table name within the connection's lakehouse. Required when connection is set. |
mapping_mode |
string |
no | "auto" |
Column mapping: "auto" or "explicit" |
columns |
dict[string, ColumnMapping] |
no | null |
Per-column mapping specifications |
partition_by |
list[string] |
no | null |
Partition columns for the output table |
audit_columns |
dict[string, string] |
no | null |
Audit column definitions as name-expression pairs |
audit_template |
string or list[string] |
no | null |
Named template(s) to apply. Resolved from audit_templates at any level or from built-in presets (fabric, minimal). See Audit Templates guide. |
audit_template_inherit |
bool |
no | true |
When false, suppresses any audit_template inherited from parent levels. Direct audit_template on this target still applies. |
audit_columns_exclude |
list[string] |
no | null |
Column names or glob patterns to exclude from the resolved template set. |
naming |
NamingConfig |
no | null |
Column and table naming normalization |
dimension |
DimensionConfig |
no | null |
Dimension target mode with composable SCD flags. Mutually exclusive with fact. See Dimension Modeling guide. |
fact |
FactConfig |
no | null |
Fact target mode with FK validation. Mutually exclusive with dimension. See Fact Tables guide. |
seed |
SeedConfig |
no | null |
Seed rows inserted on first write or when table is empty. |
warp |
string \| false \| null |
no | null |
Warp schema contract reference. A string names an explicit .warp file; false opts out of auto-discovery; null enables auto-discovery. |
warp_mode |
"auto" \| null |
no | null |
Warp generation mode. "auto" writes/updates the .warp file after each successful run. |
warp_init |
bool |
no | false |
When true and a warp exists, create the Delta table from the effective warp before the first pipeline run. |
warp_enforcement |
"warn" \| "enforce" \| "off" |
no | "warn" |
Warp contract enforcement mode. "enforce" fails the thread on violations; "warn" logs findings; "off" skips validation. |
schema_drift |
"lenient" \| "strict" \| "adaptive" |
no | "lenient" |
Schema drift handling mode. "lenient" passes extra columns through; "strict" applies on_drift action; "adaptive" passes through and extends auto-generated warps. |
on_drift |
"error" \| "warn" \| "ignore" |
no | "warn" |
Severity action for strict drift mode. "error" aborts the thread; "warn" drops extra columns with a warning; "ignore" drops silently. |
At least one of alias, path, or connection + table is required.
target.columns (ColumnMapping)¶
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
expr |
SparkExpr |
no | null |
Spark SQL expression for the column value. Mutually exclusive with drop. |
type |
string |
no | null |
Target data type (cast applied on write) |
default |
any |
no | null |
Default value when the source column is null |
drop |
bool |
no | false |
Drop this column from output. Mutually exclusive with expr. |
target.naming (NamingConfig)¶
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
columns |
NamingPattern |
no | null |
Pattern for column names |
tables |
NamingPattern |
no | null |
Pattern for table names |
exclude |
list[string] |
no | [] |
Column names or glob patterns to exclude from normalization |
on_collision |
string |
no | "error" |
Behavior when normalization produces duplicate names: "error" or "suffix" |
reserved_words |
ReservedWordConfig |
no | null |
Reserved word protection. See Weave schema: naming for details. |
Supported patterns: snake_case, camelCase, PascalCase,
UPPER_SNAKE_CASE, Title_Snake_Case, Title Case, lowercase,
UPPERCASE, kebab-case, none.
target:
alias: curated_orders
partition_by: [order_year]
mapping_mode: auto
columns:
total_amount:
type: "decimal(18,2)"
_internal_flag:
drop: true
naming:
columns: snake_case
target.dimension (DimensionConfig)¶
Composable dimension target with behavioral flags replacing traditional SCD type labels. See the Dimension Modeling guide for detailed examples.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
business_key |
list[string] |
yes | -- | Natural key columns |
surrogate_key |
DimensionSurrogateKeyConfig |
yes | -- | SK generation (see below) |
track_history |
bool |
no | false |
Enable SCD versioning (close-and-insert) |
change_detection |
dict[string, GroupConfig] |
no | auto | Named change detection groups. Defaults to a single auto group (on_change: version if track_history, else overwrite). |
previous_columns |
dict[string, string] |
no | null |
Map of output → source for prior-value tracking |
additional_keys |
dict[string, KeyConfig] |
no | null |
Secondary hash key definitions |
columns |
ScdColumnConfig |
no | defaults | SCD column names: valid_from (_valid_from), valid_to (_valid_to), is_current (_is_current) |
dates |
ScdDateConfig |
no | defaults | SCD boundary dates: min (1970-01-01), max (9999-12-31) |
seed_system_members |
bool |
no | false |
Insert Kimball sentinel rows on first write |
system_members |
list[SystemMember] |
no | defaults | Custom sentinel rows (sk, code, label). Defaults: -1/unknown, -2/not_applicable |
label_column |
string |
no | null |
Column for system member labels |
history_filter |
bool |
no | true |
Filter target reads to is_current = true |
dimension.surrogate_key:
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string |
yes | -- | Output column name |
algorithm |
string |
no | "sha256" |
Hash algorithm |
columns |
list[string] |
yes | -- | Source columns to hash |
output |
string |
no | "native" |
Output type: "native" or "string" |
dimension.change_detection group:
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string |
no | dict key | Output hash column name |
algorithm |
string |
no | "sha256" |
Hash algorithm |
columns |
list[string] or "auto" |
yes | -- | Columns to hash. "auto" selects remaining data columns. |
on_change |
string |
yes | -- | "version", "overwrite", or "static" |
output |
string |
no | "native" |
Output type |
write/keys interaction
When dimension: is present, write.match_keys,
write.on_match, keys.business_key,
keys.surrogate_key, and keys.change_detection are
forbidden — the engine derives these from the dimension
block. write.on_no_match_source and
write.on_no_match_target remain available as overrides.
target:
path: Tables/dim_customer
dimension:
business_key: [customer_id]
surrogate_key:
name: _sk_customer
columns: [customer_id]
track_history: true
seed_system_members: true
target.fact (FactConfig)¶
Fact target mode providing FK column validation and sentinel value documentation. See the Fact Tables guide.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
foreign_keys |
list[string] |
yes | -- | FK column names to validate exist in output |
sentinel_values |
SentinelValueConfig |
no | defaults | Sentinel value conventions |
fact.sentinel_values:
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
invalid |
int |
no | -4 |
Sentinel for incomplete business keys |
missing |
int |
no | -1 |
Sentinel for unmatched lookups |
target:
path: Tables/fact_orders
fact:
foreign_keys: [sk_customer, sk_product, sk_date]
sentinel_values:
invalid: -4
missing: -1
target.seed (SeedConfig)¶
Seed rows inserted into the target on first write or when the table is empty.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
on |
string |
no | "first_write" |
Trigger: "first_write" (table absent) or "empty" (table absent or zero rows) |
rows |
list[dict] |
yes | -- | Row data as column-value dicts |
write¶
Controls how data is written to the target.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
mode |
string |
no | "overwrite" |
Write mode: "overwrite", "append", "merge" |
match_keys |
list[string] |
no | null |
Match keys for merge mode. Required when mode is "merge". |
on_match |
string |
no | "update" |
Merge behavior when a match is found: "update" or "ignore" |
on_no_match_target |
string |
no | "insert" |
Merge behavior for new source rows: "insert" or "ignore" |
on_no_match_source |
string |
no | "ignore" |
Merge behavior for missing source rows: "delete", "soft_delete", "ignore" |
soft_delete_column |
string |
no | null |
Column to flag soft deletes. Required when on_no_match_source is "soft_delete". |
soft_delete_value |
bool |
no | true |
Value written to the soft delete column when a row is soft-deleted. |
soft_delete_active_value |
bool |
no | null |
Value written to the soft delete column for active rows (matched updates and new inserts). When omitted, those rows receive null. |
write:
mode: merge
match_keys: [order_id]
on_match: update
on_no_match_target: insert
on_no_match_source: soft_delete
soft_delete_column: is_deleted
soft_delete_value: true
soft_delete_active_value: false
keys¶
Key management for business keys, surrogate keys, and change detection hashes.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
business_key |
list[string] |
no | null |
Columns forming the natural business key |
surrogate_key |
SurrogateKeyConfig |
no | null |
Surrogate key generation settings |
change_detection |
ChangeDetectionConfig |
no | null |
Change detection hash settings |
keys.surrogate_key¶
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string |
yes | -- | Output column name for the surrogate key |
algorithm |
string |
no | "sha256" |
Hash algorithm: "xxhash64", "sha1", "sha256", "sha384", "sha512", "md5", "crc32", "murmur3" |
output |
string |
no | "native" |
Output type for integer-returning algorithms (xxhash64, crc32, murmur3). "native" preserves the algorithm's return type; "string" casts to StringType. Has no effect on sha*/md5. |
Algorithm choice
crc32 has a small 32-bit output space with higher collision risk.
murmur3 (Spark's hash()) may produce different results across
Spark major versions. Neither is recommended for high-cardinality
surrogate key use cases.
keys vs dimension surrogate key
keys.surrogate_key uses SurrogateKeyConfig (hash-based, no
columns field). target.dimension.surrogate_key uses
DimensionSurrogateKeyConfig (requires columns for SK
generation from business key columns). These are separate
types with different fields.
keys.change_detection¶
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string |
yes | -- | Output column name for the change hash |
columns |
list[string] |
yes | -- | Columns included in the hash |
algorithm |
string |
no | "md5" |
Hash algorithm: "xxhash64", "sha1", "sha256", "sha384", "sha512", "md5", "crc32", "murmur3" |
output |
string |
no | "native" |
Output type for integer-returning algorithms. "native" preserves the algorithm's return type; "string" casts to StringType. |
keys:
business_key: [order_id]
surrogate_key:
name: sk_order
algorithm: sha256
change_detection:
name: change_hash
columns: [status, amount, updated_at]
algorithm: md5
load¶
Incremental load configuration with watermark tracking.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
mode |
string |
no | "full" |
Load mode: "full", "incremental_watermark", "incremental_parameter", "cdc" |
watermark_column |
string |
no | null |
Column for watermark comparison. Required for incremental_watermark. May be combined with cdc mode when cdc.operation_column is set, to narrow the read window for append-only CDC history tables; rejected when cdc.preset is "delta_cdf". |
watermark_type |
string |
no | null |
Data type of the watermark column: "timestamp", "date", "int", "long". Required whenever watermark_column is set. |
watermark_inclusive |
bool |
no | false |
Include rows equal to the last watermark value (use with merge/overwrite for idempotency) |
watermark_store |
WatermarkStoreConfig |
no | null |
Watermark persistence backend |
cdc |
CdcConfig |
no | null |
CDC configuration. Required when mode is "cdc". |
watermark_format |
string |
no | null |
Spark DateTimeFormatter pattern used to parse a string-typed watermark_column at read time. Only valid with watermark_type of "timestamp" or "date". |
load.watermark_store¶
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
type |
string |
no | "table_properties" |
Store type: "table_properties" (zero-config) or "metadata_table" |
table_path |
string |
no | null |
Path to metadata table. Required for "metadata_table" type. |
load.cdc¶
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
preset |
string |
no | null |
Preset name: "delta_cdf". Mutually exclusive with operation_column. |
operation_column |
string |
no | null |
Column containing operation type. Mutually exclusive with preset. |
insert_value |
string |
no | null |
Value indicating an insert operation |
update_value |
string |
no | null |
Value indicating an update operation |
delete_value |
string |
no | null |
Value indicating a delete operation |
on_delete |
string |
no | "hard_delete" |
Delete behavior: "hard_delete" or "soft_delete" |
Either preset or operation_column must be set (but not both). When using
explicit mapping, at least insert_value or update_value is required.
When cdc.operation_column is set (the explicit mapping path), load.mode: cdc
may be combined with watermark_column and watermark_type to narrow the
read window. This is the recommended pattern for append-only CDC history
tables — for example, SAP data landed by Fabric Open Database Mirror, where
every change row carries a change timestamp like AEDATTM. Without the
watermark, weevr re-reads the full history on every run; with it, the second
and subsequent runs only read rows past the persisted high-water mark. The
HWM is captured from the source DataFrame before operation routing, so
delete rows participate in advancing the window.
The cdc.preset: delta_cdf path is unchanged — it relies on Delta Change
Data Feed's commit-version tracking and rejects watermark_column because
the two mechanisms are redundant.
# Incremental watermark load
load:
mode: incremental_watermark
watermark_column: updated_at
watermark_type: timestamp
watermark_store:
type: table_properties
# CDC with watermark column — generic CDC composed with a watermark.
# Recommended for SAP Open Database Mirror history tables and similar
# append-only CDC sources.
load:
mode: cdc
cdc:
operation_column: OPFLAG
insert_value: "I"
update_value: "U"
delete_value: "D"
on_delete: soft_delete
watermark_column: AEDATTM
watermark_type: timestamp
# watermark_store omitted: defaults to table_properties on the target Delta table
# CDC with a string-typed watermark column — declares the parse pattern.
# Use this for SAP/mainframe sources where AEDATTM lands as text rather
# than a Delta TIMESTAMP column.
load:
mode: cdc
cdc:
operation_column: OPFLAG
insert_value: "I"
update_value: "U"
delete_value: "D"
watermark_column: AEDATTM
watermark_type: timestamp
watermark_format: "yyyy-MM-dd HH:mm:ss.SSSSSX"
validations¶
Pre-write data quality rules. Each rule is evaluated as a Spark SQL boolean expression against the DataFrame before it is written.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string |
yes | -- | Human-readable rule name |
rule |
SparkExpr |
yes | -- | Spark SQL boolean expression |
severity |
string |
no | "error" |
Severity level: "info", "warn", "error", "fatal" |
validations:
- name: positive_amount
rule: "amount > 0"
severity: error
- name: valid_status
rule: "status IN ('active', 'inactive', 'pending')"
severity: warn
assertions¶
Post-execution assertions evaluated against the target dataset after writing.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
type |
string |
yes | -- | Assertion type: "row_count", "column_not_null", "unique", "expression", "fk_sentinel_rate" |
severity |
string |
no | "warn" |
Severity level: "info", "warn", "error", "fatal" |
columns |
list[string] |
no | null |
Columns for column_not_null, unique, and fk_sentinel_rate assertions |
min |
int |
no | null |
Minimum value for row_count assertions |
max |
int |
no | null |
Maximum value for row_count assertions |
expression |
SparkExpr |
no | null |
Spark SQL expression for expression assertions |
column |
string |
cond. | null |
Single column for fk_sentinel_rate (mutually exclusive with columns) |
sentinel |
int or string |
cond. | null |
Single sentinel value for fk_sentinel_rate (mutually exclusive with sentinels) |
sentinels |
dict |
cond. | null |
Named sentinel groups for fk_sentinel_rate (dict-of-int or dict-of-dict) |
max_rate |
float |
no | null |
Maximum sentinel rate threshold for fk_sentinel_rate |
message |
string |
no | null |
Custom failure message for fk_sentinel_rate |
For fk_sentinel_rate: at least one of column or columns is
required, and at least one of sentinel or sentinels is required.
assertions:
- type: row_count
min: 1
severity: error
- type: column_not_null
columns: [order_id, customer_id]
severity: fatal
- type: unique
columns: [order_id]
- type: expression
expression: "count(CASE WHEN amount < 0 THEN 1 END) = 0"
severity: warn
- type: fk_sentinel_rate
column: plant_id
sentinel: -4
max_rate: 0.05
message: "plant FK invalid rate exceeded"
failure¶
Per-thread failure handling policy. Controls what happens to remaining threads in a weave when this thread fails.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
on_failure |
string |
no | "abort_weave" |
Policy: "abort_weave", "skip_downstream", "continue" |
abort_weave-- Stop all remaining threads in the weave immediately.skip_downstream-- Skip only threads that depend on the failed thread.continue-- Continue executing unrelated threads.
execution¶
Runtime execution settings. These cascade from loom to weave to thread, with the most specific level winning.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
log_level |
string |
no | "standard" |
Logging verbosity: "minimal", "standard", "verbose", "debug" |
trace |
bool |
no | true |
Collect execution spans for telemetry |
params¶
Typed parameter declarations. Parameters can be referenced in expressions using
${param.name} syntax.
Each entry maps a parameter name to a ParamSpec:
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string |
yes | -- | Parameter name |
type |
string |
yes | -- | Data type: "string", "int", "float", "bool", "date", "timestamp", "list[string]" |
required |
bool |
no | true |
Whether the parameter must be supplied at runtime |
default |
any |
no | null |
Default value when not supplied |
description |
string |
no | "" |
Human-readable description |
params:
start_date:
name: start_date
type: date
required: true
description: "Start of the reporting window"
region_filter:
name: region_filter
type: string
required: false
default: "ALL"
lookups¶
Thread-level lookup definitions. The structure is identical to weave-level lookups. When a thread and its parent weave define a lookup with the same name, the thread-level definition wins.
lookups:
dim_region:
source:
type: delta
alias: ref.dim_region
key: [region_id]
values: [region_name, country_code]
materialize: true
strategy: broadcast
column_sets¶
Thread-level column set definitions for rename steps. The structure is identical to weave-level column_sets. When the same name is defined at both thread and weave level, the thread-level definition wins.
column_sets:
local_overrides:
source:
type: yaml
path: mappings/thread_overrides.yaml
on_unmapped: pass_through
variables¶
Thread-level typed variable declarations. Variables can be set by
sql_statement hook steps via set_var and referenced in downstream config
as ${var.name}. The structure is identical to
weave-level variables.
pre_steps / post_steps¶
Hook steps that run before or after the thread's core execution. The structure is identical to weave-level pre_steps / post_steps.
Thread-level hook lists are not cascaded from the parent weave. Each
level runs its own list independently. pre_steps run before any source is
read; post_steps run after write, assertions, and exports.
pre_steps:
- type: quality_gate
name: check_source_freshness
check: source_freshness
source: raw.orders
max_age: "4h"
post_steps:
- type: log_message
message: "Thread complete. Rows written: ${var.row_count}."
level: info
connections¶
Named OneLake connection declarations. Connections identify a Fabric workspace
and lakehouse by GUID or variable reference. Sources, targets, and exports can
then refer to a connection by name instead of embedding raw GUIDs or
abfss:// paths inline.
Connections cascade from loom to weave to thread. When the same name is defined at multiple levels, the most-specific level wins.
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
type |
string |
yes | -- | Connection type. Currently only "onelake" is supported. |
workspace |
string |
yes | -- | OneLake workspace GUID or ${fabric.workspace_id} variable. |
lakehouse |
string |
yes | -- | OneLake lakehouse GUID or ${fabric.lakehouse_id} variable. |
default_schema |
string |
no | null |
Default schema for tables in this connection. Can be overridden per-source or per-target with schema:. |
${fabric.workspace_id}, ${fabric.lakehouse_id}, and
${fabric.workspace_name} are injected at runtime from the
active Fabric session. Using these variables makes a connection
portable across environments without changing GUIDs.
See the Connections guide for cross-lakehouse examples, schema overrides, and migration patterns.
connections:
raw:
type: onelake
workspace: "${fabric.workspace_id}"
lakehouse: "${fabric.lakehouse_id}"
default_schema: raw
archive:
type: onelake
workspace: "a1b2c3d4-0000-0000-0000-111111111111"
lakehouse: "e5f6a7b8-0000-0000-0000-222222222222"
sources:
orders:
connection: raw
table: orders
archived_orders:
connection: archive
schema: history
table: orders
target:
connection: raw
table: fact_orders
Complete example¶
config_version: "1.0"
sources:
orders:
type: delta
alias: raw_orders
dedup:
keys: [order_id]
order_by: "updated_at DESC"
customers:
type: delta
alias: dim_customers
steps:
- filter:
expr: "status != 'cancelled'"
- join:
source: customers
type: left
on: [customer_id]
- derive:
columns:
total_amount: "quantity * unit_price"
order_year: "year(order_date)"
- select:
columns:
- order_id
- customer_id
- customer_name
- total_amount
- order_year
- updated_at
target:
alias: curated_orders
partition_by: [order_year]
naming:
columns: snake_case
write:
mode: merge
match_keys: [order_id]
on_match: update
on_no_match_target: insert
keys:
business_key: [order_id]
surrogate_key:
name: sk_order
change_detection:
name: change_hash
columns: [total_amount, customer_name]
load:
mode: incremental_watermark
watermark_column: updated_at
watermark_type: timestamp
validations:
- name: positive_total
rule: "total_amount >= 0"
severity: error
assertions:
- type: row_count
min: 1
severity: error
- type: unique
columns: [order_id]
failure:
on_failure: skip_downstream
execution:
log_level: standard
trace: true
tags: [orders, curated]