Dataflow

The data ingestion platform writes data to different layers, following the Microsoft Delta Lake Architecture. The only job of the sdp-package is to automate the first two layers: bronze and silver. The metadata that is needed to correctly ingest the data into those layers are specified in the yaml-specification.

The diagram below depicts the dataflow as a high-level overview:

                                 ┌──────────────┐
                                 │              │
                             ┌───►    BRONZE    │
                             │   │              │
           ┌──────────────┐  │   └──────────────┘
           │              │  │
snapshot───►  PREPROCESS  ├──┤
           │              │  │
           └──────────────┘  │   ┌──────────────┐
                             │   │              │
                             └───►    SILVER    │
                                 │              │
                                 └──────────────┘

Preprocess

The raw data first travels to a preprocess-stage. This stage does some rudimentary operations on the dataset:

  1. Flatten

XML and JSON data can be nested. This nested data is often difficult to handle by downstream databases. The preprocess stage flattens the data so that it can be safely stored in the bronze/silver layers.

  1. Select/exclude

Sometimes you just want to read a subset of the data. The YAML specification allows to select and exclude some columns from the original dataset. Filtering the columns in the preprocess-stages makes sure that only the selected data appears in bronze/silver.

  1. Set datatypes

The preprocess stage is also responsible for determining the datatypes. First, it checks the YAML specification to check whether a datatype has been fixed for a certain column. These explicitly set datatypes always take presendence. Then, which the remaining (non-specified) columns it looks at the value of default_datatype which can set to infer or string. If set to infer, which is default, the platform tries to automatically determine the datatype. If the value of default_datatype is string, then all the columns will be converted to StringType by default.

  1. Add metadata

Add the following columns:

  • __sdp_file_name: which files were used to ingest this data.

  • __sdp_ingest_timestamp: the timestamp at which the ingestion started (in human-readable format)

  • __sdp_ingest_unix_timestamp: time timestamp at which the ingestion started (in unix-timestamp format - integer)

  1. Apply data masking rules

Data masking should be applied as early in the process as possible, since no sensitive data should be exposed in the bronze or silver layer.

  1. Remove trailing spaces

The code iterates through the columns of a DataFrame and trims any string columns by removing leading and trailing whitespace.

Example snapshot data

column_1

column_2

column_3

column_4

1

this is some data

sensitive

{a:1, b:2}

2

this is other data

sensitive

{a:3, b:4}

Example snapshot data after preprocess stage.

column_1

column_2

column_3

column_4__a

column_4__b

__sdp_file_name

__sdp_ingest_timestamp

__sdp_ingest_unix_timestamp

1

this is some data

********

1

2

/path/to/file.csv

2022-04-06 11:05:11.056383

1649235939

2

this is other data

********

3

4

/path/to/file.csv

2022-04-06 11:05:11.056383

1649235939

Bronze (10)

The bronze zone in the data lake is the raw data zone and serves as the prototypical lake, where massive amounts of data are stored. The data is not filtered, modified nor deduplicated. This means that we must be able to retreive all records that we ever read into the bronze layer in a 1-to-1 fashion. Typically, error-prone parsing of data is avoided. Data should be as raw as possible.

The bronze zone tracks all the history which makes it possible to rebuild entire historic views in the silver and gold zone. This also makes it possible to go back in time to a certain moment and query a specific view of the data.

Suppose that we ingest the data from snapshot_table two times. This would result in the following data in the bronze layer:

Example of bronze after reading the same data two times.

column_1

column_2

column_3

column_4__a

column_4__b

__sdp_file_name

__sdp_ingest_timestamp

__sdp_ingest_unix_timestamp

1

this is some data

********

1

2

/path/to/file.csv

2022-04-06 11:05:11.056383

1649235939

2

this is other data

********

3

4

/path/to/file.csv

2022-04-06 11:05:11.056383

1649235939

1

this is some data

********

1

2

/path/to/file.csv

2022-04-06 11:10:12.056383

1649237000

2

this is other data

********

3

4

/path/to/file.csv

2022-04-06 11:10:12.056383

1649237000

Silver (20)

The silver data layer provides an accurate view of the current/latest data for each dataset. It determines what is latest by using the keys directive in the YAML specification. Data is queryable for easy debugging and ready for consumption - mainly for system integrations, data analysts, or data science purposes (the data is still in the native structure). All modifications on the silver table are logged by using a build-in future called Change Data Feed. This history (also called table_changes) table shows how the data of the silver table changes through time, which can be incredibly useful. We will now show how the silver table and the table_changes table evolve after ingesting multiple datasets consecutively.

Suppose that we first ingest the following data:

Sample snapshot to load in silver

column_1

column_2

1

data_1

2

data_2

3

data_3

We do this using the following YAML specification.

name: test_docs
sources:
- type: staging
name: staging-source

entities:
- name: entity_1
source:
    name: staging-source
    config:
    spark_read_options:
        file_type: CSV
        delimiter: ;
        charset: UTF-8
        header: True
settings:
    keys:
    - column_1

This results in the following silver table (select * from 20_test_docs.entity_1):

Silver after first ingest

column_1

column_2

1

data_1

2

data_2

3

data_3

Also, when we query the table_changes (SELECT * FROM table_changes('20_test_docs.entity_1', 0)):

Silver table_changes after first ingest

column_1

column_2

_change_type

_commit_version

_commit_timestamp

1

data_1

insert

0

2022-04-06T10:59:24.000+0000

2

data_2

insert

0

2022-04-06T10:59:24.000+0000

3

data_3

insert

0

2022-04-06T10:59:24.000+0000

Next, we’re going to ingest a new dataset:

Silver after first ingest

column_1

column_2

2

data_modified

3

data_3

4

data_added

Note the that we’ve removed the first row; modified the second row; kept the third row unchanged and added a fourth row.

The results in the following silver table (select * from 20_test_docs.entity_1):

Silver after second ingest

column_1

column_2

2

data_modified

3

data_3

4

data_added

When we query the table_changes now, we get an overview of the record that have been changed:

Silver after second ingest

column_1

column_2

_change_type

_commit_version

_commit_timestamp

1

data_1

insert

0

2022-04-06T10:59:24.000+0000

2

data_2

insert

0

2022-04-06T10:59:24.000+0000

3

data_3

insert

0

2022-04-06T10:59:24.000+0000

2

data_2

update_preimage

1

2022-04-06T11:05:02.000+0000

2

data_modified

update_postimage

1

2022-04-06T11:05:02.000+0000

4

data_added

insert

1

2022-04-06T11:05:02.000+0000

1

data_1

delete

2

2022-04-06T11:05:05.000+0000

Note

The keys not available in the latest snapshot are deleted from the silver layer. You can overwrite is behaviour by setting silver_type as specified here.

Divergent Delta Lake Architecture: Accelerated Approach

The Delta Lake architecture implemented in our system deviates slightly from the traditional approach. In a traditional architecture, the data flows through each layer of the architecture (from Bronze to Silver to Gold), while incrementally and progressively improving the structure and quality of data. In our adapted approach, the data is simultaneously loaded into the Bronze and Silver layer, where the Bronze layer maintains a full data dump and the Silver layer the latest/current data only. The primary motivation behind this divergent architecture is speed optimization. By avoiding the need to load data into the Bronze layer before transforming it to the Silver layer, we can achieve faster data processing and minimize latency.

By implementing this approach, we have chosen not to perform transformations directly from the Bronze to the Silver layer. Instead, we have adopted a workflow that leverages notebooks for transformations, allowing each team to maintain ownership of their work. These transformations can be performed from the Bronze or Silver Layer to the Gold layer.

Lastly, while the Delta Lake architecture typically allows the Silver layer to be built up from the Bronze layer automatically, our implementation requires a manual action for this process. As a result, we take backups of the Silver layer to mitigate the slight disadvantage of lacking an automated build-up mechanism. These backups ensure data reliability and provide a fallback option in case of any unexpected data inconsistencies or errors.