Add a Thread¶
Goal: Define a thread YAML that reads from one or more sources, applies transformation steps, and writes to a Delta target. Then wire it into a weave and verify the output.
Prerequisites¶
- A working weevr project with the conventional directory structure
- At least one weave to add the thread to
- Access to the source data (Delta table, CSV, Parquet, or other Spark-readable format)
Step 1 -- Define sources¶
Declare each data source your thread needs. The first source listed becomes the primary DataFrame that flows through the pipeline.
config_version: "1.0"
sources:
orders:
type: csv
path: data/orders.csv
options:
header: "true"
delimiter: ","
customers:
type: delta
alias: bronze.customers
Source types include delta, csv, json, parquet, and excel. Delta
sources use alias for table resolution; file sources use path.
Step 2 -- Define transformation steps¶
Steps execute top-to-bottom. Each step has a single key indicating the operation type:
steps:
- filter:
expr: "order_total > 0"
- join:
source: customers
type: left
on:
- customer_id
- derive:
columns:
order_year: "year(order_date)"
customer_name: "concat(first_name, ' ', last_name)"
- rename:
columns:
order_total: total_amount
- cast:
columns:
total_amount: "decimal(10,2)"
order_year: "int"
- select:
columns:
- order_id
- customer_id
- customer_name
- total_amount
- order_year
Available step types include filter, derive, join, select, drop,
rename, cast, dedup, sort, union, aggregate, window, pivot,
unpivot, case_when, fill_null, coalesce, string_ops, and date_ops.
Step 3 -- Define the target¶
Set the write destination. Use mapping_mode: auto (the default) to let
matching column names flow through automatically:
For explicit control over every column, use mapping_mode: explicit with a
columns map.
Step 4 -- Configure write behavior¶
Specify how data lands in the target:
Available modes: overwrite (replace all data), append (add rows), and
merge (upsert). Merge mode requires match_keys and supports on_match,
on_no_match_target, and on_no_match_source options.
Step 5 -- Add the thread to a weave¶
Reference your new thread in an existing or new weave config. Thread references use paths relative to the project root, with typed extensions:
This resolves to the orders/fact_orders.thread file in the project directory.
Step 6 -- Run and verify¶
Execute the weave and check the result:
from weevr import Context
ctx = Context(spark, "my-project.weevr")
result = ctx.run("orders.weave")
assert result.status == "success"
print(result.summary())
Use mode="preview" to test transforms against sampled data without writing:
Putting it together¶
Combine the sections above into a single file at
orders/fact_orders.thread. The complete structure is:
config_version at the top, then sources, steps, target, and write.