Dataflow ======== The data ingestion platform writes data to different `layers`, following the `Microsoft Delta Lake Architecture `_. The only job of the `sdp`-package is to automate the first two layers: `bronze` and `silver`. The metadata that is needed to correctly ingest the data into those layers are specified in the `yaml`-specification. The diagram below depicts the dataflow as a high-level overview:: ┌──────────────┐ │ │ ┌───► BRONZE │ │ │ │ ┌──────────────┐ │ └──────────────┘ │ │ │ snapshot───► PREPROCESS ├──┤ │ │ │ └──────────────┘ │ ┌──────────────┐ │ │ │ └───► SILVER │ │ │ └──────────────┘ Preprocess ---------- The raw data first travels to a `preprocess`-stage. This stage does some rudimentary operations on the dataset: 1. **Flatten** XML and JSON data can be nested. This nested data is often difficult to handle by downstream databases. The preprocess stage flattens the data so that it can be safely stored in the bronze/silver layers. 2. **Select/exclude** Sometimes you just want to read a subset of the data. The YAML specification allows to select and exclude some columns from the original dataset. Filtering the columns in the `preprocess`-stages makes sure that only the selected data appears in bronze/silver. 3. **Set datatypes** The preprocess stage is also responsible for determining the datatypes. First, it checks the YAML specification to check whether a datatype has been `fixed` for a certain column. These explicitly set datatypes always take presendence. Then, which the remaining (non-specified) columns it looks at the value of ``default_datatype`` which can set to ``infer`` or ``string``. If set to ``infer``, which is default, the platform tries to automatically determine the datatype. If the value of ``default_datatype`` is ``string``, then all the columns will be converted to ``StringType`` by default. 4. **Add metadata** Add the following columns: - `__sdp_file_name`: which files were used to ingest this data. - `__sdp_ingest_timestamp`: the timestamp at which the ingestion started (in human-readable format) - `__sdp_ingest_unix_timestamp`: time timestamp at which the ingestion started (in unix-timestamp format - integer) 5. **Apply data masking rules** Data masking should be applied as early in the process as possible, since no sensitive data should be exposed in the bronze or silver layer. 6. **Remove trailing spaces** The code iterates through the columns of a DataFrame and trims any string columns by removing leading and trailing whitespace. .. _snapshot_table: .. csv-table:: Example `snapshot` data :header: "column_1", "column_2", "column_3", "column_4" :align: center 1, "this is some data", "sensitive", "{a:1, b:2}" 2, "this is other data", "sensitive", "{a:3, b:4}" .. _preprocess_table: .. csv-table:: Example `snapshot` data **after** preprocess stage. :header: "column_1", "column_2", "column_3", "column_4__a", "column_4__b", "__sdp_file_name", "__sdp_ingest_timestamp", "__sdp_ingest_unix_timestamp" :align: center 1, "this is some data", \*\*\*\*\*\*\*\*, 1, 2, /path/to/file.csv, 2022-04-06 11:05:11.056383, 1649235939 2, "this is other data", \*\*\*\*\*\*\*\*, 3, 4, /path/to/file.csv, 2022-04-06 11:05:11.056383, 1649235939 Bronze (10) ----------- The `bronze zone` in the data lake is the raw data zone and serves as the prototypical lake, where massive amounts of data are stored. The data is not filtered, modified nor deduplicated. This means that we must be able to retreive all records that we ever read into the bronze layer in a 1-to-1 fashion. Typically, error-prone parsing of data is avoided. Data should be as `raw` as possible. The `bronze zone` tracks all the history which makes it possible to rebuild entire historic views in the silver and gold zone. This also makes it possible to go back in time to a certain moment and query a specific view of the data. Suppose that we ingest the data from :numref:`snapshot_table` **two** times. This would result in the following data in the bronze layer: .. _bronze_table: .. csv-table:: Example of bronze after reading the same data **two** times. :header: "column_1", "column_2", "column_3", "column_4__a", "column_4__b", "__sdp_file_name", "__sdp_ingest_timestamp", "__sdp_ingest_unix_timestamp" :align: center 1, "this is some data", \*\*\*\*\*\*\*\*, 1, 2, /path/to/file.csv, 2022-04-06 11:05:11.056383, 1649235939 2, "this is other data", \*\*\*\*\*\*\*\*, 3, 4, /path/to/file.csv, 2022-04-06 11:05:11.056383, 1649235939 1, "this is some data", \*\*\*\*\*\*\*\*, 1, 2, /path/to/file.csv, 2022-04-06 11:10:12.056383, 1649237000 2, "this is other data", \*\*\*\*\*\*\*\*, 3, 4, /path/to/file.csv, 2022-04-06 11:10:12.056383, 1649237000 Silver (20) ----------- The silver data layer provides an accurate view of the current/latest data for each dataset. It determines what is `latest` by using the ``keys`` directive in the :ref:`YAML specification `. Data is queryable for easy debugging and ready for consumption - mainly for system integrations, data analysts, or data science purposes (the data is still in the native structure). All modifications on the silver table are logged by using a build-in future called `Change Data Feed `_. This history (also called `table_changes`) table shows how the data of the silver table changes through time, which can be incredibly useful. We will now show how the `silver` table and the `table_changes` table evolve after ingesting multiple datasets consecutively. Suppose that we first ingest the following data: .. csv-table:: Sample snapshot to load in silver :header: "column_1", "column_2" :align: center 1, "data_1" 2, "data_2" 3, "data_3" We do this using the following :ref:`YAML specification `. .. code-block:: yaml name: test_docs sources: - type: staging name: staging-source entities: - name: entity_1 source: name: staging-source config: spark_read_options: file_type: CSV delimiter: ; charset: UTF-8 header: True settings: keys: - column_1 This results in the following silver table (``select * from 20_test_docs.entity_1``): .. csv-table:: Silver after first ingest :header: "column_1", "column_2" :align: center 1, "data_1" 2, "data_2" 3, "data_3" Also, when we query the `table_changes` (``SELECT * FROM table_changes('20_test_docs.entity_1', 0)``): .. csv-table:: Silver `table_changes` after first ingest :header: "column_1", "column_2", "_change_type", "_commit_version", "_commit_timestamp" :align: center 1, "data_1", "insert", 0, "2022-04-06T10:59:24.000+0000" 2, "data_2", "insert", 0, "2022-04-06T10:59:24.000+0000" 3, "data_3", "insert", 0, "2022-04-06T10:59:24.000+0000" Next, we're going to ingest a new dataset: .. csv-table:: Silver after first ingest :header: "column_1", "column_2" :align: center 2, "data_modified" 3, "data_3" 4, "data_added" Note the that we've removed the first row; modified the second row; kept the third row unchanged and added a fourth row. The results in the following silver table (``select * from 20_test_docs.entity_1``): .. csv-table:: Silver after second ingest :header: "column_1", "column_2" :align: center 2, "data_modified" 3, "data_3" 4, "data_added" When we query the `table_changes` now, we get an overview of the record that have been changed: .. csv-table:: Silver after second ingest :header: "column_1", "column_2", "_change_type", "_commit_version", "_commit_timestamp" :align: center 1, "data_1", "insert", 0, "2022-04-06T10:59:24.000+0000" 2, "data_2", "insert", 0, "2022-04-06T10:59:24.000+0000" 3, "data_3", "insert", 0, "2022-04-06T10:59:24.000+0000" 2, "data_2", "update_preimage", 1, "2022-04-06T11:05:02.000+0000" 2, "data_modified", "update_postimage", 1, "2022-04-06T11:05:02.000+0000" 4, "data_added", "insert", 1, "2022-04-06T11:05:02.000+0000" 1, "data_1", "delete", 2, "2022-04-06T11:05:05.000+0000" .. NOTE:: The ``keys`` not available in the latest `snapshot` are **deleted** from the silver layer. You can overwrite is behaviour by setting ``silver_type`` as specified :ref:`here `. Divergent Delta Lake Architecture: Accelerated Approach ---------- The Delta Lake architecture implemented in our system deviates slightly from the traditional approach. In a traditional architecture, the data flows through each layer of the architecture (from Bronze to Silver to Gold), while incrementally and progressively improving the structure and quality of data. In our adapted approach, the data is simultaneously loaded into the Bronze and Silver layer, where the Bronze layer maintains a full data dump and the Silver layer the latest/current data only. The primary motivation behind this divergent architecture is speed optimization. By avoiding the need to load data into the Bronze layer before transforming it to the Silver layer, we can achieve faster data processing and minimize latency. By implementing this approach, we have chosen not to perform transformations directly from the Bronze to the Silver layer. Instead, we have adopted a workflow that leverages notebooks for transformations, allowing each team to maintain ownership of their work. These transformations can be performed from the Bronze or Silver Layer to the Gold layer. Lastly, while the Delta Lake architecture typically allows the Silver layer to be built up from the Bronze layer automatically, our implementation requires a manual action for this process. As a result, we take backups of the Silver layer to mitigate the slight disadvantage of lacking an automated build-up mechanism. These backups ensure data reliability and provide a fallback option in case of any unexpected data inconsistencies or errors.