Dataflow
The data ingestion platform writes data to different layers, following the Microsoft Delta Lake Architecture. The only job of the sdp-package is to automate the first two layers: bronze and silver. The metadata that is needed to correctly ingest the data into those layers are specified in the yaml-specification.
The diagram below depicts the dataflow as a high-level overview:
┌──────────────┐
│ │
┌───► BRONZE │
│ │ │
┌──────────────┐ │ └──────────────┘
│ │ │
snapshot───► PREPROCESS ├──┤
│ │ │
└──────────────┘ │ ┌──────────────┐
│ │ │
└───► SILVER │
│ │
└──────────────┘
Preprocess
The raw data first travels to a preprocess-stage. This stage does some rudimentary operations on the dataset:
Flatten
XML and JSON data can be nested. This nested data is often difficult to handle by downstream databases. The preprocess stage flattens the data so that it can be safely stored in the bronze/silver layers.
Select/exclude
Sometimes you just want to read a subset of the data. The YAML specification allows to select and exclude some columns from the original dataset. Filtering the columns in the preprocess-stages makes sure that only the selected data appears in bronze/silver.
Set datatypes
The preprocess stage is also responsible for determining the datatypes. First, it checks the YAML specification to check whether a datatype has been fixed for a certain column. These explicitly set datatypes always take presendence. Then, which the remaining (non-specified) columns it looks at the value of
default_datatypewhich can set toinferorstring. If set toinfer, which is default, the platform tries to automatically determine the datatype. If the value ofdefault_datatypeisstring, then all the columns will be converted toStringTypeby default.
Add metadata
Add the following columns:
__sdp_file_name: which files were used to ingest this data.
__sdp_ingest_timestamp: the timestamp at which the ingestion started (in human-readable format)
__sdp_ingest_unix_timestamp: time timestamp at which the ingestion started (in unix-timestamp format - integer)
Apply data masking rules
Data masking should be applied as early in the process as possible, since no sensitive data should be exposed in the bronze or silver layer.
Remove trailing spaces
The code iterates through the columns of a DataFrame and trims any string columns by removing leading and trailing whitespace.
column_1 |
column_2 |
column_3 |
column_4 |
|---|---|---|---|
1 |
this is some data |
sensitive |
{a:1, b:2} |
2 |
this is other data |
sensitive |
{a:3, b:4} |
column_1 |
column_2 |
column_3 |
column_4__a |
column_4__b |
__sdp_file_name |
__sdp_ingest_timestamp |
__sdp_ingest_unix_timestamp |
|---|---|---|---|---|---|---|---|
1 |
this is some data |
******** |
1 |
2 |
/path/to/file.csv |
2022-04-06 11:05:11.056383 |
1649235939 |
2 |
this is other data |
******** |
3 |
4 |
/path/to/file.csv |
2022-04-06 11:05:11.056383 |
1649235939 |
Bronze (10)
The bronze zone in the data lake is the raw data zone and serves as the prototypical lake, where massive amounts of data are stored. The data is not filtered, modified nor deduplicated. This means that we must be able to retreive all records that we ever read into the bronze layer in a 1-to-1 fashion. Typically, error-prone parsing of data is avoided. Data should be as raw as possible.
The bronze zone tracks all the history which makes it possible to rebuild entire historic views in the silver and gold zone. This also makes it possible to go back in time to a certain moment and query a specific view of the data.
Suppose that we ingest the data from snapshot_table two times. This would result in the following data in the bronze layer:
column_1 |
column_2 |
column_3 |
column_4__a |
column_4__b |
__sdp_file_name |
__sdp_ingest_timestamp |
__sdp_ingest_unix_timestamp |
|---|---|---|---|---|---|---|---|
1 |
this is some data |
******** |
1 |
2 |
/path/to/file.csv |
2022-04-06 11:05:11.056383 |
1649235939 |
2 |
this is other data |
******** |
3 |
4 |
/path/to/file.csv |
2022-04-06 11:05:11.056383 |
1649235939 |
1 |
this is some data |
******** |
1 |
2 |
/path/to/file.csv |
2022-04-06 11:10:12.056383 |
1649237000 |
2 |
this is other data |
******** |
3 |
4 |
/path/to/file.csv |
2022-04-06 11:10:12.056383 |
1649237000 |
Silver (20)
The silver data layer provides an accurate view of the current/latest data for each dataset. It determines what is latest by using the keys directive in the YAML specification.
Data is queryable for easy debugging and ready for consumption - mainly for system integrations, data analysts, or data science purposes (the data is still in the native structure).
All modifications on the silver table are logged by using a build-in future called Change Data Feed.
This history (also called table_changes) table shows how the data of the silver table changes through time, which can be incredibly useful.
We will now show how the silver table and the table_changes table evolve after ingesting multiple datasets consecutively.
Suppose that we first ingest the following data:
column_1 |
column_2 |
|---|---|
1 |
data_1 |
2 |
data_2 |
3 |
data_3 |
We do this using the following YAML specification.
name: test_docs
sources:
- type: staging
name: staging-source
entities:
- name: entity_1
source:
name: staging-source
config:
spark_read_options:
file_type: CSV
delimiter: ;
charset: UTF-8
header: True
settings:
keys:
- column_1
This results in the following silver table (select * from 20_test_docs.entity_1):
column_1 |
column_2 |
|---|---|
1 |
data_1 |
2 |
data_2 |
3 |
data_3 |
Also, when we query the table_changes (SELECT * FROM table_changes('20_test_docs.entity_1', 0)):
column_1 |
column_2 |
_change_type |
_commit_version |
_commit_timestamp |
|---|---|---|---|---|
1 |
data_1 |
insert |
0 |
2022-04-06T10:59:24.000+0000 |
2 |
data_2 |
insert |
0 |
2022-04-06T10:59:24.000+0000 |
3 |
data_3 |
insert |
0 |
2022-04-06T10:59:24.000+0000 |
Next, we’re going to ingest a new dataset:
column_1 |
column_2 |
|---|---|
2 |
data_modified |
3 |
data_3 |
4 |
data_added |
Note the that we’ve removed the first row; modified the second row; kept the third row unchanged and added a fourth row.
The results in the following silver table (select * from 20_test_docs.entity_1):
column_1 |
column_2 |
|---|---|
2 |
data_modified |
3 |
data_3 |
4 |
data_added |
When we query the table_changes now, we get an overview of the record that have been changed:
column_1 |
column_2 |
_change_type |
_commit_version |
_commit_timestamp |
|---|---|---|---|---|
1 |
data_1 |
insert |
0 |
2022-04-06T10:59:24.000+0000 |
2 |
data_2 |
insert |
0 |
2022-04-06T10:59:24.000+0000 |
3 |
data_3 |
insert |
0 |
2022-04-06T10:59:24.000+0000 |
2 |
data_2 |
update_preimage |
1 |
2022-04-06T11:05:02.000+0000 |
2 |
data_modified |
update_postimage |
1 |
2022-04-06T11:05:02.000+0000 |
4 |
data_added |
insert |
1 |
2022-04-06T11:05:02.000+0000 |
1 |
data_1 |
delete |
2 |
2022-04-06T11:05:05.000+0000 |
Note
The keys not available in the latest snapshot are deleted from the silver layer. You can overwrite is behaviour by setting silver_type as specified here.
Divergent Delta Lake Architecture: Accelerated Approach
The Delta Lake architecture implemented in our system deviates slightly from the traditional approach. In a traditional architecture, the data flows through each layer of the architecture (from Bronze to Silver to Gold), while incrementally and progressively improving the structure and quality of data. In our adapted approach, the data is simultaneously loaded into the Bronze and Silver layer, where the Bronze layer maintains a full data dump and the Silver layer the latest/current data only. The primary motivation behind this divergent architecture is speed optimization. By avoiding the need to load data into the Bronze layer before transforming it to the Silver layer, we can achieve faster data processing and minimize latency.
By implementing this approach, we have chosen not to perform transformations directly from the Bronze to the Silver layer. Instead, we have adopted a workflow that leverages notebooks for transformations, allowing each team to maintain ownership of their work. These transformations can be performed from the Bronze or Silver Layer to the Gold layer.
Lastly, while the Delta Lake architecture typically allows the Silver layer to be built up from the Bronze layer automatically, our implementation requires a manual action for this process. As a result, we take backups of the Silver layer to mitigate the slight disadvantage of lacking an automated build-up mechanism. These backups ensure data reliability and provide a fallback option in case of any unexpected data inconsistencies or errors.