Skip to content

API Reference

The building blocks of Veridelta. Explore the configuration schemas, column rules, and the core diffing engine.

Configuration Models

These models define how you configure a Veridelta comparison. You can pass these directly via Python, or define them in a YAML file.

Data models for Veridelta configuration and results.

ColumnRule

Bases: BaseModel

Specific overrides for one or more columns using exact names or regex.

Attributes:

Name Type Description
column_names list[str]

Exact names of the columns in the source dataset.

pattern str | None

Regex pattern to match multiple columns (e.g., '^AMT_.*').

absolute_tolerance float | None

The maximum allowed absolute difference.

relative_tolerance float | None

The maximum allowed relative difference (e.g., 0.01 for 1%).

case_insensitive bool | None

If True, ignores case differences in strings.

whitespace_mode WhitespaceMode | None

Granular control over stripping leading/trailing whitespace.

regex_replace dict[str, str] | None

Dictionary of {pattern: replacement} to sanitize text before comparison.

pad_zeros int | None

Left-pad numeric strings to this exact length (e.g., 5 -> '00123').

value_map dict[str, str] | None

Translate Source values to Target values before comparison (e.g., {'M': 'Male'}).

null_values list[str] | None

Specific string values to actively coerce to NULL (e.g., ['N/A', '-999']).

treat_null_as_equal bool | None

If True, evaluates NULL == NULL as a successful match.

datetime_format str | None

Expected strptime format for dates (e.g., '%Y-%m-%d %H:%M:%S').

timezone str | None

Target timezone to normalize dates to before comparison.

cast_to str | None

Explicitly cast column to this Polars datatype (e.g., 'Float64').

ignore bool

Whether to skip this column entirely during comparison.

rename_to str | None

The name in the target dataset if it differs from the source.

Source code in src/veridelta/models.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class ColumnRule(BaseModel):
    """Specific overrides for one or more columns using exact names or regex.

    Attributes:
        column_names: Exact names of the columns in the source dataset.
        pattern: Regex pattern to match multiple columns (e.g., '^AMT_.*').
        absolute_tolerance: The maximum allowed absolute difference.
        relative_tolerance: The maximum allowed relative difference (e.g., 0.01 for 1%).
        case_insensitive: If True, ignores case differences in strings.
        whitespace_mode: Granular control over stripping leading/trailing whitespace.
        regex_replace: Dictionary of {pattern: replacement} to sanitize text before comparison.
        pad_zeros: Left-pad numeric strings to this exact length (e.g., 5 -> '00123').
        value_map: Translate Source values to Target values before comparison (e.g., {'M': 'Male'}).
        null_values: Specific string values to actively coerce to NULL (e.g., ['N/A', '-999']).
        treat_null_as_equal: If True, evaluates NULL == NULL as a successful match.
        datetime_format: Expected strptime format for dates (e.g., '%Y-%m-%d %H:%M:%S').
        timezone: Target timezone to normalize dates to before comparison.
        cast_to: Explicitly cast column to this Polars datatype (e.g., 'Float64').
        ignore: Whether to skip this column entirely during comparison.
        rename_to: The name in the target dataset if it differs from the source.
    """

    column_names: list[str] = Field(
        default_factory=list, description="Exact names of the columns in the source."
    )
    pattern: str | None = Field(
        default=None, description="Regex pattern to match multiple columns (e.g., '^AMT_.*')."
    )

    absolute_tolerance: float | None = Field(
        default=None, ge=0.0, description="Absolute tolerance for numeric differences."
    )
    relative_tolerance: float | None = Field(
        default=None, ge=0.0, description="Relative tolerance (e.g., 0.01 for 1%)."
    )

    case_insensitive: bool | None = Field(
        default=None, description="Ignore case for string comparisons."
    )
    whitespace_mode: WhitespaceMode | None = Field(
        default=None, description="Granular control over whitespace stripping."
    )
    regex_replace: dict[str, str] | None = Field(
        default=None,
        description="Dictionary of {regex_pattern: replacement_string} to sanitize text.",
    )
    pad_zeros: int | None = Field(
        default=None,
        ge=0,
        description="Left-pad numeric strings to this length (e.g., 5 -> '00123').",
    )

    value_map: dict[str, str] | None = Field(
        default=None, description="Translate Source values to Target values (e.g., {'M': 'Male'})."
    )
    null_values: list[str] | None = Field(
        default=None, description="Specific string values to treat as NULL (e.g., ['N/A', '-999'])."
    )
    treat_null_as_equal: bool | None = Field(
        default=None, description="Treat missing values (NULL/None) in both sources as a match."
    )

    datetime_format: str | None = Field(
        default=None, description="Expected strptime format (e.g., '%Y-%m-%d %H:%M:%S')."
    )
    timezone: str | None = Field(
        default=None, description="Target timezone to normalize dates to before comparison."
    )

    cast_to: str | None = Field(
        default=None,
        description="Explicitly cast column to this Polars datatype (e.g., 'Float64').",
    )
    ignore: bool = Field(
        default=False, description="If True, this column will be excluded from the comparison."
    )
    rename_to: str | None = Field(
        default=None,
        description="Name in target dataset if different (use only for single columns).",
    )

    @field_validator("pattern")
    @classmethod
    def validate_pattern(cls, v: str | None) -> str | None:
        """Ensure the provided regex pattern is a valid expression at configuration time.

        Args:
            v: The string regex pattern to validate, or None.

        Returns:
            The validated regex string, or None if not provided.

        Raises:
            ValueError: If the regex pattern cannot be compiled.
        """
        if v is not None:
            try:
                re.compile(v)
            except re.error as err:
                raise ValueError(f"Invalid regex pattern '{v}': {err}") from err
        return v

    @field_validator("regex_replace")
    @classmethod
    def validate_regex_replace(cls, v: dict[str, str] | None) -> dict[str, str] | None:
        """Ensure all keys in the regex replacement dictionary are valid regex patterns.

        Args:
            v: A dictionary mapping regex patterns to their replacements, or None.

        Returns:
            The validated dictionary, or None if not provided.

        Raises:
            ValueError: If any key in the dictionary is an invalid regex pattern.
        """
        if v is not None:
            for pattern in v:
                try:
                    re.compile(pattern)
                except re.error as err:
                    raise ValueError(f"Invalid regex replace pattern '{pattern}': {err}") from err
        return v

validate_pattern(v) classmethod

Ensure the provided regex pattern is a valid expression at configuration time.

Parameters:

Name Type Description Default
v str | None

The string regex pattern to validate, or None.

required

Returns:

Type Description
str | None

The validated regex string, or None if not provided.

Raises:

Type Description
ValueError

If the regex pattern cannot be compiled.

Source code in src/veridelta/models.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@field_validator("pattern")
@classmethod
def validate_pattern(cls, v: str | None) -> str | None:
    """Ensure the provided regex pattern is a valid expression at configuration time.

    Args:
        v: The string regex pattern to validate, or None.

    Returns:
        The validated regex string, or None if not provided.

    Raises:
        ValueError: If the regex pattern cannot be compiled.
    """
    if v is not None:
        try:
            re.compile(v)
        except re.error as err:
            raise ValueError(f"Invalid regex pattern '{v}': {err}") from err
    return v

validate_regex_replace(v) classmethod

Ensure all keys in the regex replacement dictionary are valid regex patterns.

Parameters:

Name Type Description Default
v dict[str, str] | None

A dictionary mapping regex patterns to their replacements, or None.

required

Returns:

Type Description
dict[str, str] | None

The validated dictionary, or None if not provided.

Raises:

Type Description
ValueError

If any key in the dictionary is an invalid regex pattern.

Source code in src/veridelta/models.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@field_validator("regex_replace")
@classmethod
def validate_regex_replace(cls, v: dict[str, str] | None) -> dict[str, str] | None:
    """Ensure all keys in the regex replacement dictionary are valid regex patterns.

    Args:
        v: A dictionary mapping regex patterns to their replacements, or None.

    Returns:
        The validated dictionary, or None if not provided.

    Raises:
        ValueError: If any key in the dictionary is an invalid regex pattern.
    """
    if v is not None:
        for pattern in v:
            try:
                re.compile(pattern)
            except re.error as err:
                raise ValueError(f"Invalid regex replace pattern '{pattern}': {err}") from err
    return v

DiffConfig

Bases: BaseModel

The master configuration for a Veridelta comparison run.

Attributes:

Name Type Description
source SourceConfig

Configuration for the 'Left' dataset.

target SourceConfig

Configuration for the 'Right' dataset.

primary_keys list[str]

Columns used to join and align the datasets.

schema_mode SchemaMode

How strictly to enforce column existence and matching between sources.

strict_types bool

If False, the engine will attempt to cast target columns to source types.

default_absolute_tolerance float

Global absolute tolerance for all numeric columns.

default_relative_tolerance float

Global relative tolerance for all numeric columns.

default_treat_null_as_equal bool

Global setting for handling NULL == NULL.

default_whitespace_mode WhitespaceMode

Global setting for stripping whitespace in strings.

default_null_values list[str]

Global list of string values to aggressively coerce to NULL.

rules list[ColumnRule]

List of per-column comparison overrides.

threshold float

Allowed mismatch percentage (0.0 to 1.0) before failure.

output_path str | None

Path to save the resulting diff report and artifacts.

output_format SourceType

Format for exporting diff artifacts (e.g., parquet, csv).

Source code in src/veridelta/models.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
class DiffConfig(BaseModel):
    """The master configuration for a Veridelta comparison run.

    Attributes:
        source: Configuration for the 'Left' dataset.
        target: Configuration for the 'Right' dataset.
        primary_keys: Columns used to join and align the datasets.
        schema_mode: How strictly to enforce column existence and matching between sources.
        strict_types: If False, the engine will attempt to cast target columns to source types.
        default_absolute_tolerance: Global absolute tolerance for all numeric columns.
        default_relative_tolerance: Global relative tolerance for all numeric columns.
        default_treat_null_as_equal: Global setting for handling NULL == NULL.
        default_whitespace_mode: Global setting for stripping whitespace in strings.
        default_null_values: Global list of string values to aggressively coerce to NULL.
        rules: List of per-column comparison overrides.
        threshold: Allowed mismatch percentage (0.0 to 1.0) before failure.
        output_path: Path to save the resulting diff report and artifacts.
        output_format: Format for exporting diff artifacts (e.g., parquet, csv).
    """

    source: SourceConfig = Field(..., description="Config for the 'Left' dataset.")
    target: SourceConfig = Field(..., description="Config for the 'Right' dataset.")

    primary_keys: list[str] = Field(..., description="Columns used to join datasets.")

    schema_mode: SchemaMode = Field(
        default=SchemaMode.INTERSECTION, description="How strictly to enforce column existence."
    )
    strict_types: bool = Field(
        default=False,
        description="If False, engine attempts to safely cast Target columns to Source types.",
    )

    normalize_column_names: bool = Field(
        default=False,
        description="If True, strips whitespace and lowercases all column headers before processing.",
    )

    default_absolute_tolerance: float = Field(
        default=0.0, ge=0.0, description="Global absolute tolerance for numeric columns."
    )
    default_relative_tolerance: float = Field(
        default=0.0, ge=0.0, description="Global relative tolerance for numeric columns."
    )
    default_treat_null_as_equal: bool = Field(
        default=True, description="Globally treat NULL == NULL as a match."
    )
    default_whitespace_mode: WhitespaceMode = Field(
        default=WhitespaceMode.NONE, description="Global string whitespace stripping behavior."
    )
    default_null_values: list[str] = Field(
        default_factory=list, description="Global list of string values to coerce to NULL."
    )

    rules: list[ColumnRule] = Field(default_factory=list, description="Column overrides.")

    threshold: float = Field(
        default=0.0, ge=0.0, le=1.0, description="Allowed mismatch percentage (0.0 to 1.0)."
    )

    output_path: str | None = Field(
        default=None, description="Optional path to save the detailed diff report."
    )
    output_format: SourceType = Field(
        default=SourceType.PARQUET,
        description="Format for exporting diff artifacts (e.g., parquet, csv).",
    )

    @model_validator(mode="after")
    def apply_schema_normalization(self) -> "DiffConfig":
        """Automatically lowercase user config keys if normalization is enabled."""
        if self.normalize_column_names:
            self.primary_keys = [pk.strip().lower() for pk in self.primary_keys]

            for rule in self.rules:
                rule.column_names = [col.strip().lower() for col in rule.column_names]

                # do NOT touch rule.pattern (regex is intentionally case-sensitive)
                # do NOT touch rule.rename_to (user might want to rename to a capitalized word)

        return self

apply_schema_normalization()

Automatically lowercase user config keys if normalization is enabled.

Source code in src/veridelta/models.py
276
277
278
279
280
281
282
283
284
285
286
287
288
@model_validator(mode="after")
def apply_schema_normalization(self) -> "DiffConfig":
    """Automatically lowercase user config keys if normalization is enabled."""
    if self.normalize_column_names:
        self.primary_keys = [pk.strip().lower() for pk in self.primary_keys]

        for rule in self.rules:
            rule.column_names = [col.strip().lower() for col in rule.column_names]

            # do NOT touch rule.pattern (regex is intentionally case-sensitive)
            # do NOT touch rule.rename_to (user might want to rename to a capitalized word)

    return self

DiffSummary

Bases: BaseModel

The high-level results of a Veridelta comparison.

Attributes:

Name Type Description
total_rows_source int

Number of rows in the source dataset.

total_rows_target int

Number of rows in the target dataset.

added_count int

Rows found only in the target (missing from source).

removed_count int

Rows found only in the source (missing from target).

changed_count int

Rows present in both datasets but with value differences.

is_match bool

Boolean indicating if the overall diff falls within the allowed threshold.

Source code in src/veridelta/models.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
class DiffSummary(BaseModel):
    """The high-level results of a Veridelta comparison.

    Attributes:
        total_rows_source: Number of rows in the source dataset.
        total_rows_target: Number of rows in the target dataset.
        added_count: Rows found only in the target (missing from source).
        removed_count: Rows found only in the source (missing from target).
        changed_count: Rows present in both datasets but with value differences.
        is_match: Boolean indicating if the overall diff falls within the allowed threshold.
    """

    total_rows_source: int
    total_rows_target: int
    added_count: int
    removed_count: int
    changed_count: int
    is_match: bool

SchemaMode

Bases: str, Enum

Defines how strictly the engine enforces column schemas between datasets.

Source code in src/veridelta/models.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class SchemaMode(str, Enum):
    """Defines how strictly the engine enforces column schemas between datasets."""

    EXACT = "exact"
    """Strict 1:1 mapping. Columns must be identical and in the exact same order."""

    RELAXED_ORDER = "relaxed_order"
    """Strict 1:1 mapping, but order-agnostic. Columns must be identical, but sequence does not matter."""

    ALLOW_ADDITIONS = "allow_additions"
    """Target can have new columns, but must contain every column present in the Source."""

    ALLOW_REMOVALS = "allow_removals"
    """Target is allowed to drop legacy columns, but cannot add any new columns."""

    INTERSECTION = "intersection"
    """Only diff columns that exist in both datasets, ignoring all others. (Default)"""

ALLOW_ADDITIONS = 'allow_additions' class-attribute instance-attribute

Target can have new columns, but must contain every column present in the Source.

ALLOW_REMOVALS = 'allow_removals' class-attribute instance-attribute

Target is allowed to drop legacy columns, but cannot add any new columns.

EXACT = 'exact' class-attribute instance-attribute

Strict 1:1 mapping. Columns must be identical and in the exact same order.

INTERSECTION = 'intersection' class-attribute instance-attribute

Only diff columns that exist in both datasets, ignoring all others. (Default)

RELAXED_ORDER = 'relaxed_order' class-attribute instance-attribute

Strict 1:1 mapping, but order-agnostic. Columns must be identical, but sequence does not matter.

SourceConfig

Bases: BaseModel

Configuration for a specific data source.

Attributes:

Name Type Description
path str

File system path or URI to the data.

format SourceType

The format of the file (e.g., CSV, Parquet).

options dict[str, Any]

Format-specific kwargs passed to the loader.

Source code in src/veridelta/models.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class SourceConfig(BaseModel):
    """Configuration for a specific data source.

    Attributes:
        path: File system path or URI to the data.
        format: The format of the file (e.g., CSV, Parquet).
        options: Format-specific kwargs passed to the loader.
    """

    path: str = Field(..., description="File system path or URI to the data.")
    format: SourceType = Field(SourceType.CSV, description="The format of the file.")
    options: dict[str, Any] = Field(
        default_factory=dict,
        description="Format-specific options (e.g., {'separator': ';'}).",
    )

SourceType

Bases: str, Enum

Supported and roadmap data formats for ingestion.

Source code in src/veridelta/models.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class SourceType(str, Enum):
    """Supported and roadmap data formats for ingestion."""

    CSV = "csv"
    JSON = "json"
    PARQUET = "parquet"
    FIXED_WIDTH = "fixed_width"
    NETCDF = "netcdf"
    SHAPEFILE = "shapefile"
    GEOPACKAGE = "geopackage"
    EXCEL = "excel"
    SQL = "sql"
    DELTA = "delta"
    AVRO = "avro"
    XML = "xml"
    ARROW = "arrow"

WhitespaceMode

Bases: str, Enum

Granular control over string whitespace stripping.

Source code in src/veridelta/models.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class WhitespaceMode(str, Enum):
    """Granular control over string whitespace stripping."""

    NONE = "none"
    """Do not strip any whitespace."""

    LEFT = "left"
    """Strip leading whitespace only."""

    RIGHT = "right"
    """Strip trailing whitespace only."""

    BOTH = "both"
    """Strip both leading and trailing whitespace."""

BOTH = 'both' class-attribute instance-attribute

Strip both leading and trailing whitespace.

LEFT = 'left' class-attribute instance-attribute

Strip leading whitespace only.

NONE = 'none' class-attribute instance-attribute

Do not strip any whitespace.

RIGHT = 'right' class-attribute instance-attribute

Strip trailing whitespace only.

The Engine

The core mathematical diffing engine, powered by Polars.

Core engine for data ingestion and alignment.

BaseLoader

Bases: ABC

Abstract base class for all data loaders.

Source code in src/veridelta/engine.py
24
25
26
27
28
29
30
class BaseLoader(ABC):
    """Abstract base class for all data loaders."""

    @abstractmethod
    def load(self, config: SourceConfig) -> pl.DataFrame:
        """Load data into a Polars DataFrame."""
        pass

load(config) abstractmethod

Load data into a Polars DataFrame.

Source code in src/veridelta/engine.py
27
28
29
30
@abstractmethod
def load(self, config: SourceConfig) -> pl.DataFrame:
    """Load data into a Polars DataFrame."""
    pass

CSVLoader

Bases: BaseLoader

Loader for CSV files using Polars.

Source code in src/veridelta/engine.py
33
34
35
36
37
38
class CSVLoader(BaseLoader):
    """Loader for CSV files using Polars."""

    def load(self, config: SourceConfig) -> pl.DataFrame:
        """Load a CSV file into a Polars DataFrame using the provided config options."""
        return pl.read_csv(config.path, **config.options)

load(config)

Load a CSV file into a Polars DataFrame using the provided config options.

Source code in src/veridelta/engine.py
36
37
38
def load(self, config: SourceConfig) -> pl.DataFrame:
    """Load a CSV file into a Polars DataFrame using the provided config options."""
    return pl.read_csv(config.path, **config.options)

DataIngestor

Coordinates loading and alignment of source and target datasets.

Source code in src/veridelta/engine.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class DataIngestor:
    """Coordinates loading and alignment of source and target datasets."""

    def __init__(self, config: DiffConfig) -> None:
        """Initialize the ingestor with a master configuration.

        Args:
            config: The master DiffConfig object.
        """
        self.config = config

    def _normalize_headers(self, df: pl.DataFrame) -> pl.DataFrame:
        """Standardizes column names to lowercase and strips whitespace if configured."""
        if not self.config.normalize_column_names:
            return df

        rename_map = {col: col.strip().lower() for col in df.columns}
        return df.rename(rename_map)

    def _align_columns(self, df: pl.DataFrame, is_source: bool = True) -> pl.DataFrame:
        """Applies renames and drops ignored columns using specific names or regex patterns."""
        rename_map = {}
        to_drop = set()

        for rule in self.config.rules:
            matched_cols = []
            for col in df.columns:
                if col in rule.column_names or (rule.pattern and re.match(rule.pattern, col)):
                    matched_cols.append(col)

            if rule.ignore:
                to_drop.update(matched_cols)
                continue

            if (
                is_source
                and rule.rename_to
                and len(rule.column_names) == 1
                and rule.column_names[0] in df.columns
            ):
                rename_map[rule.column_names[0]] = rule.rename_to

        return df.drop(list(to_drop)).rename(rename_map)

    def get_dataframes(self) -> tuple[pl.DataFrame, pl.DataFrame]:
        """Loads and aligns both datasets."""
        source_loader = LoaderFactory.get_loader(self.config.source.format)
        target_loader = LoaderFactory.get_loader(self.config.target.format)

        source_df = source_loader.load(self.config.source)
        target_df = target_loader.load(self.config.target)

        source_df = self._normalize_headers(source_df)
        target_df = self._normalize_headers(target_df)

        source_df = self._align_columns(source_df, is_source=True)
        target_df = self._align_columns(target_df, is_source=False)

        return source_df, target_df

__init__(config)

Initialize the ingestor with a master configuration.

Parameters:

Name Type Description Default
config DiffConfig

The master DiffConfig object.

required
Source code in src/veridelta/engine.py
71
72
73
74
75
76
77
def __init__(self, config: DiffConfig) -> None:
    """Initialize the ingestor with a master configuration.

    Args:
        config: The master DiffConfig object.
    """
    self.config = config

get_dataframes()

Loads and aligns both datasets.

Source code in src/veridelta/engine.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def get_dataframes(self) -> tuple[pl.DataFrame, pl.DataFrame]:
    """Loads and aligns both datasets."""
    source_loader = LoaderFactory.get_loader(self.config.source.format)
    target_loader = LoaderFactory.get_loader(self.config.target.format)

    source_df = source_loader.load(self.config.source)
    target_df = target_loader.load(self.config.target)

    source_df = self._normalize_headers(source_df)
    target_df = self._normalize_headers(target_df)

    source_df = self._align_columns(source_df, is_source=True)
    target_df = self._align_columns(target_df, is_source=False)

    return source_df, target_df

DiffEngine

The core mathematical engine that calculates differences.

Source code in src/veridelta/engine.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
class DiffEngine:
    """The core mathematical engine that calculates differences."""

    def __init__(
        self, config: DiffConfig, source_df: pl.DataFrame, target_df: pl.DataFrame
    ) -> None:
        """Initialize the engine with datasets already aligned by the DataIngestor.

        Args:
            config: The master configuration.
            source_df: The aligned 'Left' dataset.
            target_df: The aligned 'Right' dataset.
        """
        self.config = config
        self.source = source_df
        self.target = target_df

    def _get_effective_rule(self, col_name: str) -> dict[str, Any]:
        """Resolves all rules (Specific > Pattern > Global) into a unified dictionary."""
        eff = {
            "abs_tol": self.config.default_absolute_tolerance,
            "rel_tol": self.config.default_relative_tolerance,
            "treat_null": self.config.default_treat_null_as_equal,
            "whitespace": self.config.default_whitespace_mode,
            "case_insensitive": False,
            "regex_replace": None,
            "value_map": None,
            "cast_to": None,
            "ignore": False,
        }

        # Find first matching rule (Specific matches take priority over Patterns)
        matched_rule = None
        for rule in self.config.rules:
            if col_name in rule.column_names:
                matched_rule = rule
                break

        if not matched_rule:
            for rule in self.config.rules:
                if rule.pattern and re.match(rule.pattern, col_name):
                    matched_rule = rule
                    break

        if matched_rule:
            if matched_rule.absolute_tolerance is not None:
                eff["abs_tol"] = matched_rule.absolute_tolerance
            if matched_rule.relative_tolerance is not None:
                eff["rel_tol"] = matched_rule.relative_tolerance
            if matched_rule.treat_null_as_equal is not None:
                eff["treat_null"] = matched_rule.treat_null_as_equal
            if matched_rule.whitespace_mode is not None:
                eff["whitespace"] = matched_rule.whitespace_mode
            if matched_rule.case_insensitive is not None:
                eff["case_insensitive"] = matched_rule.case_insensitive

            eff["regex_replace"] = matched_rule.regex_replace
            eff["value_map"] = matched_rule.value_map
            eff["cast_to"] = matched_rule.cast_to
            eff["ignore"] = matched_rule.ignore

        return eff

    def _check_uniqueness(self) -> None:
        """Verifies that primary keys are unique in both datasets to prevent join explosions."""
        from veridelta.exceptions import DataIntegrityError

        pks = self.config.primary_keys

        if self.source.select(pks).is_duplicated().any():
            dupes = self.source.filter(self.source.select(pks).is_duplicated()).height
            raise DataIntegrityError(
                f"Primary keys {pks} are not unique in SOURCE dataset. "
                f"Found {dupes} duplicate rows. Clean your data before diffing."
            )

        if self.target.select(pks).is_duplicated().any():
            dupes = self.target.filter(self.target.select(pks).is_duplicated()).height
            raise DataIntegrityError(
                f"Primary keys {pks} are not unique in TARGET dataset. "
                f"Found {dupes} duplicate rows. Clean your data before diffing."
            )

    def _apply_string_rules(self, series: pl.Expr, rule: dict[str, Any]) -> pl.Expr:
        """Applies whitespace, casing, and regex cleaning to a string expression."""
        if rule["regex_replace"]:
            for pattern, replacement in rule["regex_replace"].items():
                series = series.str.replace_all(pattern, replacement)

        mode = rule["whitespace"]
        if mode == WhitespaceMode.LEFT:
            series = series.str.strip_chars_start()
        elif mode == WhitespaceMode.RIGHT:
            series = series.str.strip_chars_end()
        elif mode == WhitespaceMode.BOTH:
            series = series.str.strip_chars()

        if rule["case_insensitive"]:
            series = series.str.to_lowercase()

        return series

    def _build_match_expr(self, col_name: str, rule: dict[str, Any], dtype: pl.DataType) -> pl.Expr:
        """Builds a smart comparison expression based on data type and user rules."""
        src = pl.col(f"{col_name}_source")
        tgt = pl.col(f"{col_name}_target")

        if rule["value_map"]:
            src = src.replace(rule["value_map"])

        if dtype == pl.String or dtype == pl.Utf8:
            src = self._apply_string_rules(src, rule)
            tgt = self._apply_string_rules(tgt, rule)
            val_match = src == tgt

        elif dtype.is_numeric():
            if rule["abs_tol"] == 0.0 and rule["rel_tol"] == 0.0:
                val_match = src == tgt
            else:
                abs_diff = (tgt - src).abs()
                threshold = rule["abs_tol"] + (rule["rel_tol"] * src.abs())
                val_match = abs_diff <= threshold

        else:
            val_match = src == tgt

        if rule["treat_null"]:
            null_match = src.is_null() & tgt.is_null()
            return (val_match | null_match).fill_null(False)

        return val_match.fill_null(False)

    def _validate_schema(self) -> None:
        """Enforces the configured SchemaMode before comparison."""
        source_cols = set(self.source.columns)
        target_cols = set(self.target.columns)

        pks = set(self.config.primary_keys)

        missing_pks_source = pks - source_cols
        if missing_pks_source:
            raise ConfigError(f"Primary keys missing in SOURCE: {missing_pks_source}")

        missing_pks_target = pks - target_cols
        if missing_pks_target:
            raise ConfigError(f"Primary keys missing in TARGET: {missing_pks_target}")

        if self.config.schema_mode == SchemaMode.EXACT:
            if self.source.columns != self.target.columns:
                raise ConfigError(
                    f"EXACT schema match failed.\nSource: {self.source.columns}\nTarget: {self.target.columns}"
                )

        elif self.config.schema_mode == SchemaMode.RELAXED_ORDER:
            if source_cols != target_cols:
                missing = source_cols - target_cols
                extra = target_cols - source_cols
                raise ConfigError(
                    f"RELAXED_ORDER schema match failed.\nMissing in Target: {missing}\nExtra in Target: {extra}"
                )

        elif self.config.schema_mode == SchemaMode.ALLOW_ADDITIONS:
            missing_in_target = source_cols - target_cols
            if missing_in_target:
                raise ConfigError(f"Target is missing required source columns: {missing_in_target}")

        elif self.config.schema_mode == SchemaMode.ALLOW_REMOVALS:
            extra_in_target = target_cols - source_cols
            if extra_in_target:
                raise ConfigError(
                    f"Target contains unauthorized additional columns: {extra_in_target}"
                )

    def run(self) -> DiffSummary:
        """Executes the comparison logic with safety checks and type alignment."""
        self._validate_schema()
        self._check_uniqueness()

        for col in self.source.columns:
            rule = self._get_effective_rule(col)
            if rule["cast_to"]:
                dtype = getattr(pl, rule["cast_to"], None)
                if dtype:
                    if col in self.source.columns:
                        self.source = self.source.with_columns(pl.col(col).cast(dtype))
                    if col in self.target.columns:
                        self.target = self.target.with_columns(pl.col(col).cast(dtype))

        added = self.target.join(self.source, on=self.config.primary_keys, how="anti")
        removed = self.source.join(self.target, on=self.config.primary_keys, how="anti")

        src_renamed = self.source.rename(
            {c: f"{c}_source" for c in self.source.columns if c not in self.config.primary_keys}
        )
        tgt_renamed = self.target.rename(
            {c: f"{c}_target" for c in self.target.columns if c not in self.config.primary_keys}
        )
        common = src_renamed.join(tgt_renamed, on=self.config.primary_keys, how="inner")

        match_expressions = []
        match_cols = []

        for col in self.source.columns:
            if col in self.config.primary_keys or col not in self.target.columns:
                continue

            rule = self._get_effective_rule(col)
            if rule["ignore"]:
                continue

            dtype = self.source.schema[col]
            expr = self._build_match_expr(col, rule, dtype).alias(f"{col}_is_match")
            match_expressions.append(expr)
            match_cols.append(f"{col}_is_match")

        changed_count = 0
        changed_rows = pl.DataFrame()

        if match_expressions and not common.is_empty():
            evaluated = common.with_columns(match_expressions)
            all_matched = pl.all_horizontal(match_cols)
            changed_rows = evaluated.filter(~all_matched)
            changed_count = changed_rows.height

        mismatch_ratio = (added.height + removed.height + changed_count) / max(
            self.source.height, 1
        )
        is_match = mismatch_ratio <= self.config.threshold

        if self.config.output_path:
            out_dir = Path(self.config.output_path)
            out_dir.mkdir(parents=True, exist_ok=True)

            fmt = self.config.output_format
            ext = fmt.value

            def _export_artifact(df: pl.DataFrame, name: str) -> None:
                if df.height == 0:
                    return

                file_path = out_dir / f"{name}.{ext}"
                if fmt == SourceType.CSV:
                    df.write_csv(file_path)
                elif fmt == SourceType.PARQUET:
                    df.write_parquet(file_path)
                else:
                    raise NotImplementedError(
                        f"Export support for format '{fmt.value}' is not yet implemented."
                    )

            _export_artifact(added, "added_rows")
            _export_artifact(removed, "removed_rows")
            _export_artifact(changed_rows, "changed_rows")

        return DiffSummary(
            total_rows_source=self.source.height,
            total_rows_target=self.target.height,
            added_count=added.height,
            removed_count=removed.height,
            changed_count=changed_count,
            is_match=is_match,
        )

__init__(config, source_df, target_df)

Initialize the engine with datasets already aligned by the DataIngestor.

Parameters:

Name Type Description Default
config DiffConfig

The master configuration.

required
source_df DataFrame

The aligned 'Left' dataset.

required
target_df DataFrame

The aligned 'Right' dataset.

required
Source code in src/veridelta/engine.py
132
133
134
135
136
137
138
139
140
141
142
143
144
def __init__(
    self, config: DiffConfig, source_df: pl.DataFrame, target_df: pl.DataFrame
) -> None:
    """Initialize the engine with datasets already aligned by the DataIngestor.

    Args:
        config: The master configuration.
        source_df: The aligned 'Left' dataset.
        target_df: The aligned 'Right' dataset.
    """
    self.config = config
    self.source = source_df
    self.target = target_df

run()

Executes the comparison logic with safety checks and type alignment.

Source code in src/veridelta/engine.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def run(self) -> DiffSummary:
    """Executes the comparison logic with safety checks and type alignment."""
    self._validate_schema()
    self._check_uniqueness()

    for col in self.source.columns:
        rule = self._get_effective_rule(col)
        if rule["cast_to"]:
            dtype = getattr(pl, rule["cast_to"], None)
            if dtype:
                if col in self.source.columns:
                    self.source = self.source.with_columns(pl.col(col).cast(dtype))
                if col in self.target.columns:
                    self.target = self.target.with_columns(pl.col(col).cast(dtype))

    added = self.target.join(self.source, on=self.config.primary_keys, how="anti")
    removed = self.source.join(self.target, on=self.config.primary_keys, how="anti")

    src_renamed = self.source.rename(
        {c: f"{c}_source" for c in self.source.columns if c not in self.config.primary_keys}
    )
    tgt_renamed = self.target.rename(
        {c: f"{c}_target" for c in self.target.columns if c not in self.config.primary_keys}
    )
    common = src_renamed.join(tgt_renamed, on=self.config.primary_keys, how="inner")

    match_expressions = []
    match_cols = []

    for col in self.source.columns:
        if col in self.config.primary_keys or col not in self.target.columns:
            continue

        rule = self._get_effective_rule(col)
        if rule["ignore"]:
            continue

        dtype = self.source.schema[col]
        expr = self._build_match_expr(col, rule, dtype).alias(f"{col}_is_match")
        match_expressions.append(expr)
        match_cols.append(f"{col}_is_match")

    changed_count = 0
    changed_rows = pl.DataFrame()

    if match_expressions and not common.is_empty():
        evaluated = common.with_columns(match_expressions)
        all_matched = pl.all_horizontal(match_cols)
        changed_rows = evaluated.filter(~all_matched)
        changed_count = changed_rows.height

    mismatch_ratio = (added.height + removed.height + changed_count) / max(
        self.source.height, 1
    )
    is_match = mismatch_ratio <= self.config.threshold

    if self.config.output_path:
        out_dir = Path(self.config.output_path)
        out_dir.mkdir(parents=True, exist_ok=True)

        fmt = self.config.output_format
        ext = fmt.value

        def _export_artifact(df: pl.DataFrame, name: str) -> None:
            if df.height == 0:
                return

            file_path = out_dir / f"{name}.{ext}"
            if fmt == SourceType.CSV:
                df.write_csv(file_path)
            elif fmt == SourceType.PARQUET:
                df.write_parquet(file_path)
            else:
                raise NotImplementedError(
                    f"Export support for format '{fmt.value}' is not yet implemented."
                )

        _export_artifact(added, "added_rows")
        _export_artifact(removed, "removed_rows")
        _export_artifact(changed_rows, "changed_rows")

    return DiffSummary(
        total_rows_source=self.source.height,
        total_rows_target=self.target.height,
        added_count=added.height,
        removed_count=removed.height,
        changed_count=changed_count,
        is_match=is_match,
    )

LoaderFactory

Factory to return the appropriate loader based on SourceType.

Source code in src/veridelta/engine.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class LoaderFactory:
    """Factory to return the appropriate loader based on SourceType."""

    _loaders: dict[SourceType, BaseLoader] = {
        SourceType.CSV: CSVLoader(),
        SourceType.PARQUET: ParquetLoader(),
    }

    @classmethod
    def get_loader(cls, source_type: SourceType) -> BaseLoader:
        """Returns the loader for the given type."""
        loader = cls._loaders.get(source_type)
        if not loader:
            raise NotImplementedError(
                f"Support for '{source_type}' is planned but not yet implemented."
            )
        return loader

get_loader(source_type) classmethod

Returns the loader for the given type.

Source code in src/veridelta/engine.py
57
58
59
60
61
62
63
64
65
@classmethod
def get_loader(cls, source_type: SourceType) -> BaseLoader:
    """Returns the loader for the given type."""
    loader = cls._loaders.get(source_type)
    if not loader:
        raise NotImplementedError(
            f"Support for '{source_type}' is planned but not yet implemented."
        )
    return loader

ParquetLoader

Bases: BaseLoader

Loader for Parquet files using Polars.

Source code in src/veridelta/engine.py
41
42
43
44
45
46
class ParquetLoader(BaseLoader):
    """Loader for Parquet files using Polars."""

    def load(self, config: SourceConfig) -> pl.DataFrame:
        """Loader for Parquet files using Polars."""
        return pl.read_parquet(config.path, **config.options)

load(config)

Loader for Parquet files using Polars.

Source code in src/veridelta/engine.py
44
45
46
def load(self, config: SourceConfig) -> pl.DataFrame:
    """Loader for Parquet files using Polars."""
    return pl.read_parquet(config.path, **config.options)