YAML

This section is a reference guide to the SDP Ingestion YAML schema. It states all the possible YAML tags and options available. An ingest YAML can be specified either as a seperate YAML file or using the %ingest_run cell magic from a notebook by first using import sdp.ingest.

In order to use the magics, one should import the ingest package:

import sdp.ingest

Now you can use the specify the ingestion YAML inside a new notebook cell:

%%ingest_run
# YAML HERE

Root

The YAML describes an ingest a group of entities (equivalent to tables). An ingestion pipeline can only contain a single system. Each system contains one or more data sources as denoted by the sources property.

name: string # Name of the system
max_workers: int #Optional the number of parallel entity ingests, defaults to 8
comment: string  # OPTIONAL
sources:
- source 1
- source 2
- source ..
entities:
- entity 1
- entity 2
- entity ..

Sources

A source contains information on how to connect to a data source. We give the source a name to be able to reference to it later from the entity. Currently the framework supports three types: mssql, smb and staging.

...
sources:
    - name: string
    type: mssql | smb | staging | azure-files | gbo
    config: struct
...

Each source has its own distinct configuration structure.

MSSQL Source

This source reads data from a Microsoft SQL database. Its configuration is specified as sdp.ingest.sources.mssql.SqlSourceConfig.

source:
    name: string
    type: mssql
    config:
        server: string
        database: string
        auth_type: windows | sql | ad_sp | az_password
        username: string # Username for windows, sql and az_password, client_id for ad_sp
        password: string # Password for windows, sql and az_password, client_secret for ad_sp
        spark_read_options: dict
            # some examples that can be used here:
            dbtable: 'schema.tablename'
            query: str
            partitionColumn: str
            lowerBound: str
            upperBound: str
            numPartitions: str
            fetchsize: int
            option...: value..

Note

The options available for spark_read_options are a union of both the options described here and here

SMB Source

This source reads data from a SMB file share. The configuration of this source is described in sdp.ingest.sources.smb.SMBSourceConfig.

source:
    name: string
    type: smb
    config:
        username: string
        password: string
        auth_protocol: NTLM
        domain_controller: string
        root: string
        smb_file_pattern: string
        adls_file_pattern: string # OPTIONAL defaults to *.
        adls_file_anti_pattern: string # Optional, default None
        remove_source_files: string # OPTIONAL defaults to False
        zip_password: string # OPTIONAL defaults to None, if the file is a zip file and requires a password. Only one password can be specific per source and will use this password for every zip in that folder.
        raise_no_files_found_exception: bool # OPTIONAL defaults to False, if True raises an exception when no files are found
        spark_read_options:
          file_type: string # Required
          option_1: value_1 # Optional
          option_2: value_2 # Optional
          option_n: value_n # Optional

Azure Files Source

This source reads data from a Azure File share using a service principal. The configuration of this source is described in sdp.ingest.sources.azure_files.AzureFilesSourceConfig. To ingest data, you need the role Storage File Data Privileged Contributor.

source:
    name: string
    type: azure-files
    config:
        dir_path: string # E.g. https://nubulosdpdlsdev01.file.core.windows.net/test-mike
        client_id: string
        client_secret: string
        tenant_id: string # Optional, defaults to SDP__CORE__TENANT_ID enviroment variable
        azure_file_pattern: string # REQUIRED Glob pattern to filter files in the fileshare. Defaults to *
        adls_file_pattern: string # OPTIONAL Glob pattern to filter files once they arrived in staging defaults to *.
        adls_file_anti_pattern: string # Optional, default None
        remove_source_files: string # OPTIONAL defaults to False
        zip_password: string # OPTIONAL defaults to None, if the file is a zip file and requires a password. Only one password can be specific per source and will use this password for every zip in that folder.
        raise_no_files_found_exception: bool # OPTIONAL defaults to False, if True raises an exception when no files are found
        spark_read_options:
          file_type: string # Required
          option_1: value_1 # Optional
          option_2: value_2 # Optional
          option_n: value_n # Optional

GBO Source

This source reads data from a GBO .txt files from a file share. The platform assumes that the each metadata file (with _META_ in the filename) can be matched with a data file. This data file must have the exactly same filename as the metadata file, only with the _META_ removed from the filename.

source:
    name: string
    type: gbo
    config:
        username: string
        password: string
        auth_protocol: NTLM
        domain_controller: string
        root: string
        smb_file_pattern: string
        adls_file_pattern: string # OPTIONAL defaults to *.
        adls_file_anti_pattern: string # Optional, default None
        remove_source_files: string # OPTIONAL defaults to False
        metadata_dump_location: str # The location to which the metadata should be dumped. Defaults to 'bronze/gbo_metadata'

The metadata_dump_location is the location where the metadata from the metadata file (__META__) is dumped as JSON. This metadata dump can be used to validate the ingested GBO data against in the future. You normally do not modify this location. Each filepair generates a single row in this metadata location with an unique ID (UUID). The platform adds two columns to the ingested data: __gbo_metadata_location (pointing to the dumped metadata) and __gbo_metadata_id (pointing to the correct row of the dumped metadata). These two columns can be used to match (join) the ingested data with the GBO metadata.

Staging Source

This source reads data from the staging folder from the sdp container in ADLS Gen2. The corresponding Python object is documented in sdp.sources.smb.StagingSourceConfig.

source:
    name: string
    type: staging
    config:
        staging_path: string # OPTIONAL
        adls_file_pattern: string # OPTIONAL
        adls_file_anti_pattern: string #OPTIONAL
        zip_password: string # OPTIONAL defaults to None, if the file is a zip file and requires a password. Only one password can be specific per source and will use this password for every zip in that folder.
        raise_no_files_found_exception: bool # OPTIONAL defaults to False, if True raises an exception when no files are found
        spark_read_options: struct
          file_type: string # Required
          option_1: value_1 # Optional
          option_2: value_2 # Optional
          option_n: value_n # Optional

Spark Read Options

Any source can specify additional spark_read_options under source.config. These options get passed directly into spark.read. This makes the YAML code very flexible, as it allows for any generic option to be specified. Consult the spark documentation for all available options for each format.

For files this get used as:

df = spark.read.options(**self.config.spark_read_options).format(self.config.spark_read_options['file_type']).load(files)

Note

This makes spark_read_options.file_type a required property.

The resulting spark read statement for SQL sources:

df = spark.read.format("jdbc").options(**spark_read_options).load()

Username and password

We will no longer include usernames in the keyvault, but it has become a regular part of the yaml. The password refers to a secret from the keyvault, the username refers to a variable in the yaml or a fixed value, but never to a secret from the keyvault.

Settings

settings:
    default_datatype: infer | string # How to handle datatypes in the `preprocess` stage defaults to ``infer``
    type_escalation_mode: strict | loose # How to handle datatypes mismatches. Defaults to ``strict``
    can_rewrite_history: True | False # Can we rewrite the history in bronze/silver layers to solve type conflicts? Defaults to false as it creates a lot of extra data in the `table_changes` table (Change data feed).
    datatypes: # To which datatypes the columns should be converted (takes prescendence over ``default_datatype`` and ``type_escalation_mode``)
    - col: column_name
      type: StringType | BinaryType | BooleanType | DateType | TimestampType | DecimalType | DecimalType(p,s) | DoubleType | FloatType | ByteType | IntegerType | LongType | ShortType
    ..
    silver_type: merge_and_delete | merge_only # defaults to merge_and_delete
    keys: # The columns for which to merge on in the silver layer
    - column_1
    - column_2
    - column ...
    delete_null_keys: False # Optional, should the platform remove rows where any of the keys are `Null`
    select: # OPTIONAL Specify only when only a subset of our columns is required in bronze. Does not effect loading time from source. Can break when source data changes. Selects all child items automatically.
    - '[column_1]'
    - '[column_...]'
    exclude: # OPTIONAL Specify which columns NOT to include (child columns for XML are also excluded automatically.)
    - '[column_1]'
    - '[column_...]'
    max_depth: int
    maskings: #Specify which columns should be masked. UDF can be one of the functions as specified in sdp.udf.
    - col: column_name
      udf: privacy_timestamp_udf | privacy_date_udf | privacy_string_sha1_udf | privacy_string_udf | privacy_int_udf | privacy_long_udf | privacy_double_udf | privacy_binary_udf | privacy_boolean_udf | privacy_hide_email_udf
    ..
    ignore_missing_columns: bool # If True, gives an error when column in SELECT is missing from dataframe. Defaults to False
    columns_to_lowercase: bool # Defaults to True. Changes all columns to lowercase
    spark_config: dict # Spark config (https://spark.apache.org/docs/latest/configuration.html) settings to apply while ingesting entity. Defaults to None

Default Settings

Always fill in at least the settings default_datatype (default = infer), type_escalation_mode (default = strict), can_rewrite_history (default = False) and silver_type (default = merge_and_delete) even if the default value is used for this.

Default Datatype

How to handle datatypes in the preprocess stage (before bronze/silver) for which no datatype has been specified (see below). infer uses inference to automatically determine the datatype. This might yield wrong results when the dataset is really small (small sample size). Therefore, string converts all columns to StringType for which no datatype has been specified. This is a safer option, but loses type information.

Type Escalation mode

The data coming out of the preprocess stage might contain different datatypes than already present in the bronze/silver layer. type_escalation_mode specifies how to handle these type conflicts. In short, strict always tries to cast the data to a stricter datatype. Example: StringType + IntegerType -> IntegerType. loose tries to cast to a more loose datatype. Example: StringType + IntegerType -> StringType. Note, strict-mode always falls back on loose mode when the conversion fails. can_rewrite_history (see next paragraph) specifies whether the current bronze/silver tables can be rewritten to solve a type conflict (this property defaults to False).

Can Rewrite history

Specifies whether it is allowed to rewrite the delta tables in bronze/silver when performing datatype escalation. Rewriting history rewrites every record and this is represented in the Change Data Feed a.k.a. table_changes table. Therefore, we should be careful in using this option.

Datatypes

Sometimes you want manual control over the datatypes of the columns when reading data into the bronze/silver layer. The datatypes property allows you to specify a specific datatype for a column. The ingestion does not crash when the column specified is not available in the dataset. It throws a warning instead.

Silver type

The settings.silver_type determines how the silver layer is filled. When merge_and_delete is enabled (default) then the keys that do no exists in the latest read dataset will be removed from the silver layer. When merge_only is set, those keys will NOT be removed from the silver layer (even if the latest data does not contain those keys)

Delete Null Keys

Specifies whether the platform removes those records where any of the keys contain NULL’s. Might be usefull with nested ingests where you only want those records that actually contain the keys selected.

Keys

Specify a list of keys here. These keys are used in the silver layer to determine on which records to merge, update, insert and delete. You can specify the keys in the following formats:

  • ‘column’

  • ‘[column]’’

  • column.nested_column

  • ‘[column].[nested_column]’

Note

Keys are not case sensitive.

Select

Sometimes you only want to save a subset of your dataset. Specifying select columns will only save those in bronze and silver.

You can specify the select columns in the following formats:

  • ‘column’

  • ‘[column]’’

  • column.nested_column

  • ‘[column].[nested_column]’

Note

Select statements are not case sensitive.

Exclude

Excluded columns will be excluded from the dataset. Note that exclude gets executed after the select. The syntax for specifying exclude columns is equal to that of the select statement.

Max depth

Specify an integer here if you load a nested datastructure and only want to load a certain depth.

Maskings

Specifies which masking functions should be applied on the columns. Can be useful for GDPR (NL: AVG) reasons.

Ignore Missing columns

Set this property to True when we do not want to ignore whether a column in the SELECT is not available in the given dataset. Excluded columns that are not available are ignored by default.

Columns to Lowercase

When set to True by default, the process will convert column names to lowercase.

Mode

Refers to the way Spark handles corrupt records. Default is PERMISSIVE.

Options: PERMISSIVE: handles corrupted records and provides flexibility when processing data with different schemas. DROPMALFORMED: ignores the whole corrupted records FAILFAST: throws an exception when it meets corrupted records and stops the job

Spark config

Set the Spark configuration during ingest. You can specify all the spark settings available here.

Entities

Entities can be seen as the list of tables to read.

entities:
- name: str
  depends_on: Optional[str|List[str]] # Entities are executed in parallel normally. However, when this field is filled the depended entities will be executed first.
  source: struct
    name: string # Should match any system.source.name
    config: Optional[struct] # OPTIONAL system.source.config is merged into this one
  settings: Optional[struct] # OPTIONAL system.ingest is merged into this one

Each entity can reference its sources from the parent source-list. In order to match both sources, the entity.source.name should match source.name. When two sources are matched, the YAML parser merges both source and entity.source, with the configuration on the entity taking precedence.

This works equivalently for ingest configurations. Those configurations are also merged by the YAML parser with the configurations on the entity. Again, with the configuration of the entity taking precedence.

How system and entities are merged

Now, to show how the YAML parser merges the fields on the system with the entity, we take the following YAML input file:

name: test_ingest_sdp
comment: Comment here

sources:
- type: smb
    name: smb-source-pega
    config:
    username: ${BatchAccount-user}
    password: ${BatchAccount-pwd}
    auth_protocol: ntlm
    domain_controller: INS.LOCAL
    root: \\sdpfso.ins.local\DEV_InputData\Pega

settings:
    keys:
    - _id1



entities:

- name: test_entity_smb
    source:
    name: smb-source-pega
    config:
        smb_file_pattern: IL_Work_PolClosure_*.zip
        remove_source_files: False
        spark_read_options:
            file_type: XML
            multiline: True
            charset: UTF-8
            rowTag: item
    settings:
        keys:
          - _id2

The sources are merged with the entity.source by the source name. Furthermore, the settings is merged with the entity.settings. Thus, the YAML specification above is merged into the following specification:

name: test_ingest_sdp
comment: Comment here
entities:
- name: test_entity_smb
    source:
        type: smb
        name: smb-source-pega
        config:
        username: ${BatchAccount-user}
        password: ${BatchAccount-pwd}
        auth_protocol: ntlm
        domain_controller: INS.LOCAL
        root: \\sdpfso.ins.local\DEV_InputData\Pega
        name: smb-source-pega
        config:
            smb_file_pattern: IL_Work_PolClosure_*.zip
            remove_source_files: False
            spark_read_options:
                file_type: XML
                multiline: True
                charset: UTF-8
                rowTag: item
    settings:
        keys:
            - _id1
            - _id2

Using secrets

Secrets are stored in the Azure Keyvault, which is maintained by the SDP Core team. To replace a piece of YAML with the content of a secret, you can use the following pattern: ${{ secrets.secretname }}. This allows for dynamically specifying the content of a secret anywhere in the YAML file. Secrets are replaced before any other actions are executed by the YAML parser.

Note

Secrets are automatically fetched from the keyvault in the corresponding environment (dev, tst, acc, prd).

For example the following YAML snippet:

..
source:
    ...
    config:
        tenant_id: ${{ secrets.TenantId }}

Is converted into:

..
source:
    ...
    config:
        tenant_id: e6f53d0a-0004-4a2a-84f7-3c394c783b99

Using variables

Variables allow to specify a different value in the YAML for each environment (dev, tst, acc, prd). The variable definition is placed at the root level of the YAML and should always contain a value for each environment.

We define a variable in the following manner:

..
variables:
- name: server_name
  values:
    dev: 'develop.server.nl'
    tst: 'test.server.nl'
    acc: 'accept.server.nl'
    prd: 'prod.server.nl'
..

Now we can use the variable anywhere in the code:

name: test_system_sdp
comment: 'Ingest for ${{ variables.server_name }}'
max_workers: 8
..

Using he exclude option

If not all columns need to be read from a XML file, you can use the exclude option instead of indicate the specific columns in the YAML.

For example:

..
entities:
- name:
  source:
  settings:
    exclude: column_name

..

All-in-one demo

The following example combines the theory above into an example ingestion. It includes several sources and entities.