Skip to content
DATA Pipeline ATLAS 6 ETL Cdcmodestructure

DATA Pipeline ATLAS 6 ETL Cdcmodestructure

Andi Lamprecht Andi Lamprecht ·· 3 min read· Accepted
ADR-0059 · Author: Sybil Melton · Date: 2025-02-07 · Products: platform
Originally ADR-0065 DATA_PIPELINE_ATLAS-6-ETL_CDCModeStructure (v7) · Source on Confluence ↗

Context

Atlas data pipeline must support Change Data Capture (CDC) Mode as a default operational mode.
In this mode pipeline process only the recent changes.

Change processing must be done in a way that covers two actions:

  • Reading recent changes
  • Propagating recent changes

Decision

Delta lake has build in CDC support with Change Data Feed functionality. If the table has change data feed option enabled it stores the change information for each row in a format of ‘_change_type’, ’timestamp’, ‘version’. This metadata is automatically applied by a delta lake format for each row with following rules:

  • version = max(version) + 1
  • timestamp = merge_action_timestamp

An example below shows how version change is processed internaly via delta lake. Change data feed output there reference all changes that have been applied to table while switching from version 1 to version 2.

Invalid Image Path

Reading only recent changes from delta lake input table

Invalid Image Path

This reads only the rows changed in dataframe which has version attached greater than last_processed_version. last_processed_version value should be bounded to each of pipelines delta table inputs and stored in the pipeline metadata.

Storage of last_processed_version

Each of Atlas pipeline must have pipelines/<pipeline_name>/bookmarks directory created in GCP bucket. In that directory for each delta table input there should be a file named as well as the delta table input that stores last read version value. This can be read a pipeline during execution and updated on successful pipeline runs.

For example if the pipeline have 2 inputs (delta tables: table_1 and table_2). It’s bookmarks folder contents should look as following:

pipelines/
├── <pipeline_name>/
    ├── bookmarks/
        ├── table_1 <--- 2
        ├── table_2 <--- 31

Example above states that last read version of table_1 was equal to 2 and for the table_2 it was equal to 31.

On <pipeline_name> those inputs will be queried for changes above those versions. If no changes have been done to the table, nothing happens.

Consequences

  1. Each new delta lake table must be created with CDF mode turned on.
  2. Each step of the pipeline process only the changes pushed by previous step
  3. Each pipeline have a bookmarks folder on GCS where it stores last_read_version for each of it’s input tables

Alternatives Considered

  1. Store bookmarks in the table.

Pros:

  • Standard storing solution

Cons:

  • Pipelines usually have few inputs. The benefit gained for creating bookmarks table might be negligible and it introduces another layer of complexity
  1. Store bookmarks in kv in-memory store (eg. redis)
  • Same as alternative 1, no need to pay extra license costs.
Last updated on