Skip to content

Release Notes — v1.16

Release date: April 2026

This release adds a new watermark_format field on load: that lets you declare the parse pattern for a string-typed watermark column. Sources that land change timestamps as text — SAP extracts via Fabric Open Database Mirror, mainframe files, JSON dumps — can now drive incremental and generic-CDC reads reliably without depending on Spark's implicit string-to-timestamp coercion or lexicographic ordering.


Declarative parse intent for string timestamps

Prior to v1.16, weevr built the watermark filter as col > lit(last_value).cast('timestamp') and captured the new high-water mark via max(col). For a real Delta TIMESTAMP column this is perfect: the filter pushes down to file stats and max() returns a true chronological maximum. For a string column the same code path has two quiet failure modes:

  1. The filter depends on Spark's implicit string parser, which accepts some shapes and silently drops others as NULL — the user has no way to say "this string is in this format".
  2. max() over a string column returns the lexicographic maximum, which matches the chronological maximum only for fixed-width year-first formats. Anything else — trailing Z, variable fractional-second widths, offset notation — can silently advance the HWM to the wrong row.

watermark_format fixes both at the source. When set, the engine wraps the column once in to_timestamp(col, watermark_format) for watermark_type: timestamp or to_date(col, watermark_format) for watermark_type: date, then reuses the typed expression for both the filter predicate and the HWM aggregate. A single shared helper, _typed_watermark_col, keeps the incremental_watermark and generic CDC code paths in lockstep.

load:
  mode: cdc
  cdc:
    operation_column: OPFLAG
    insert_value: "I"
    update_value: "U"
    delete_value: "D"
    on_delete: soft_delete
  watermark_column: AEDATTM
  watermark_type: timestamp
  watermark_format: "yyyy-MM-dd HH:mm:ss.SSSSSX"

The same field works identically in mode: incremental_watermark, for threads that use an append-only event log with a string timestamp.

What stays the same

  • Delta-typed TIMESTAMP and DATE columns should not set watermark_format. The implicit-cast path is already correct and keeps Delta predicate pushdown available — wrapping a native temporal column in to_timestamp would defeat file skipping.
  • Existing thread configs that don't set watermark_format are byte-identical to v1.15. A snapshot regression guard locks the legacy build_watermark_filter expression shape.
  • The Delta CDF preset (cdc.preset: delta_cdf) is unchanged and still ignores column-based watermarks — CDF's commit-version tracking is the incremental mechanism for that path.
  • Numeric watermark types (int, long) are unaffected. watermark_format is meaningless for them and is rejected at config-parse time if you try to pair them.
  • Failure semantics mirror v1.15: HWM is persisted only after a successful write, and a mid-run failure leaves the prior HWM in place so the next run idempotently reprocesses the same window.

Canonical ISO HWM, decoupled from the source format

The persisted high-water mark is stored as the Python str() of the Spark Timestamp — a canonical ISO form like "2026-01-02 10:00:00.12345" — regardless of the source format. On the next run the stored HWM is cast back through the existing F.lit(...).cast(watermark_type) path, which accepts canonical ISO for both timestamp and date. This decouples persisted state from source layout: changing watermark_format on a later run does not require replaying state, and the watermark store format stays stable across schema evolution on the source side.

Silent drop for unparseable rows

Rows whose values fail the declared pattern resolve to NULL under Spark's default to_timestamp/to_date semantics. NULL fails the col > hwm comparison and is skipped by max(), so unparseable rows are silently excluded from both the filter and the HWM. This matches the principle of "make the first-run symptom loud": if the pattern is completely wrong, the first run reads zero rows, captures no HWM, persists nothing, and the thread telemetry shows the drop clearly. A targeted DEBUG log line is emitted from the reader whenever watermark_format is active, capturing the thread column, format, prior HWM, and new HWM. Enable DEBUG on weevr.operations.readers to see it:

import logging
logging.getLogger("weevr.operations.readers").setLevel(logging.DEBUG)

The log line fires symmetrically from both read_source_incremental and read_cdc_source so downstream log analysis can correlate watermark behavior across load modes.


Configuration summary

One new field and one new cross-field rule on LoadConfig:

Field Type Default Effect
watermark_format str None Spark DateTimeFormatter pattern used to parse a string-typed watermark_column. Only valid with watermark_type of "timestamp" or "date".

Cross-field rules now enforced by LoadConfig:

Rule Effect
watermark_format set without watermark_column Rejected at config load
watermark_format + watermark_type: int or long Rejected at config load
watermark_format + watermark_type: timestamp or date Accepted

See the load configuration reference for the full field table and the execution modes guide for a worked SAP-style example including the "zero rows through" diagnostic workflow.

Internal changes

  • New module-private _typed_watermark_col(column, type, format) helper in src/weevr/operations/readers.py that returns a bare F.col when no format is set, wraps in to_timestamp for timestamp type, wraps in to_date for date type, and raises ValueError for any other type as a defensive guard.
  • build_watermark_filter gained an optional keyword-only watermark_format parameter, default None. The signature change is backwards compatible — existing callers that do not pass the argument see no change in behavior.
  • read_source_incremental and the generic-CDC branch of read_cdc_source thread load_config.watermark_format through to both the filter and the HWM aggregate. The CDF preset branch was intentionally left untouched.
  • src/weevr/engine/executor.py was not modified — both reader functions already took load_config, so the new field flows through implicitly.

Backwards compatibility

  • Existing configurations keep working without changes.
  • No new state schema. Persisted HWM round-trips through both table_properties and metadata_table backends in the same canonical ISO form that the engine already used.
  • No new credentials, no new session-state mutation. The engine does not pin spark.sql.legacy.timeParserPolicy; sessions that override it observe whatever parse semantics Spark applies under that policy.