.. _yaml_reference: YAML ==== This section is a reference guide to the SDP Ingestion YAML schema. It states all the possible YAML tags and options available. An ingest YAML can be specified either as a seperate YAML file or using the ``%ingest_run`` cell magic from a notebook by first using ``import sdp.ingest``. In order to use the magics, one should import the ingest package: .. code-block:: python import sdp.ingest Now you can use the specify the ingestion YAML inside a new notebook cell: .. code-block:: yaml %%ingest_run # YAML HERE Root ---- The YAML describes an ingest a group of ``entities`` (equivalent to tables). An ingestion pipeline can only contain a single system. Each system contains one or more data sources as denoted by the ``sources`` property. .. code-block:: yaml name: string # Name of the system max_workers: int #Optional the number of parallel entity ingests, defaults to 8 comment: string # OPTIONAL sources: - source 1 - source 2 - source .. entities: - entity 1 - entity 2 - entity .. Sources ------- A source contains information on how to connect to a data source. We give the source a `name` to be able to reference to it later from the `entity`. Currently the framework supports three types: mssql, smb and staging. .. code-block:: yaml ... sources: - name: string type: mssql | smb | staging | azure-files | gbo config: struct ... Each source has its own distinct configuration structure. MSSQL Source ************ This source reads data from a Microsoft SQL database. Its configuration is specified as :py:class:`sdp.ingest.sources.mssql.SqlSourceConfig`. .. code-block:: yaml source: name: string type: mssql config: server: string database: string auth_type: windows | sql | ad_sp | az_password username: string # Username for windows, sql and az_password, client_id for ad_sp password: string # Password for windows, sql and az_password, client_secret for ad_sp spark_read_options: dict # some examples that can be used here: dbtable: 'schema.tablename' query: str partitionColumn: str lowerBound: str upperBound: str numPartitions: str fetchsize: int option...: value.. .. note:: The options available for spark_read_options are a **union** of both the options described `here `_ and `here `_ SMB Source ********** This source reads data from a SMB file share. The configuration of this source is described in :py:class:`sdp.ingest.sources.smb.SMBSourceConfig`. .. code-block:: yaml source: name: string type: smb config: username: string password: string auth_protocol: NTLM domain_controller: string root: string smb_file_pattern: string adls_file_pattern: string # OPTIONAL defaults to *. adls_file_anti_pattern: string # Optional, default None remove_source_files: string # OPTIONAL defaults to False zip_password: string # OPTIONAL defaults to None, if the file is a zip file and requires a password. Only one password can be specific per source and will use this password for every zip in that folder. raise_no_files_found_exception: bool # OPTIONAL defaults to False, if True raises an exception when no files are found spark_read_options: file_type: string # Required option_1: value_1 # Optional option_2: value_2 # Optional option_n: value_n # Optional Azure Files Source ****************** This source reads data from a Azure File share using a service principal. The configuration of this source is described in :py:class:`sdp.ingest.sources.azure_files.AzureFilesSourceConfig`. To ingest data, you need the role `Storage File Data Privileged Contributor`. .. code-block:: yaml source: name: string type: azure-files config: dir_path: string # E.g. https://nubulosdpdlsdev01.file.core.windows.net/test-mike client_id: string client_secret: string tenant_id: string # Optional, defaults to SDP__CORE__TENANT_ID enviroment variable azure_file_pattern: string # REQUIRED Glob pattern to filter files in the fileshare. Defaults to * adls_file_pattern: string # OPTIONAL Glob pattern to filter files once they arrived in staging defaults to *. adls_file_anti_pattern: string # Optional, default None remove_source_files: string # OPTIONAL defaults to False zip_password: string # OPTIONAL defaults to None, if the file is a zip file and requires a password. Only one password can be specific per source and will use this password for every zip in that folder. raise_no_files_found_exception: bool # OPTIONAL defaults to False, if True raises an exception when no files are found spark_read_options: file_type: string # Required option_1: value_1 # Optional option_2: value_2 # Optional option_n: value_n # Optional GBO Source ********** This source reads data from a GBO .txt files from a file share. The platform assumes that the each metadata file (with `_META_` in the filename) can be matched with a data file. This data file must have the exactly same filename as the metadata file, only with the `_META_` removed from the filename. .. code-block:: yaml source: name: string type: gbo config: username: string password: string auth_protocol: NTLM domain_controller: string root: string smb_file_pattern: string adls_file_pattern: string # OPTIONAL defaults to *. adls_file_anti_pattern: string # Optional, default None remove_source_files: string # OPTIONAL defaults to False metadata_dump_location: str # The location to which the metadata should be dumped. Defaults to 'bronze/gbo_metadata' The `metadata_dump_location` is the location where the metadata from the metadata file (`__META__`) is dumped as JSON. This metadata dump can be used to validate the ingested GBO data against in the future. You normally do not modify this location. Each filepair generates a single row in this metadata location with an unique ID (UUID). The platform adds two columns to the ingested data: `__gbo_metadata_location` (pointing to the dumped metadata) and `__gbo_metadata_id` (pointing to the correct row of the dumped metadata). These two columns can be used to match (join) the ingested data with the GBO metadata. Staging Source ************** This source reads data from the `staging` folder from the ``sdp`` container in ADLS Gen2. The corresponding Python object is documented in :py:class:`sdp.sources.smb.StagingSourceConfig`. .. code-block:: yaml source: name: string type: staging config: staging_path: string # OPTIONAL adls_file_pattern: string # OPTIONAL adls_file_anti_pattern: string #OPTIONAL zip_password: string # OPTIONAL defaults to None, if the file is a zip file and requires a password. Only one password can be specific per source and will use this password for every zip in that folder. raise_no_files_found_exception: bool # OPTIONAL defaults to False, if True raises an exception when no files are found spark_read_options: struct file_type: string # Required option_1: value_1 # Optional option_2: value_2 # Optional option_n: value_n # Optional Spark Read Options ****************** Any source can specify additional ``spark_read_options`` under ``source.config``. These options get passed directly into `spark.read`. This makes the YAML code very flexible, as it allows for any generic option to be specified. Consult the spark documentation for all available options for each format. For files this get used as: .. code-block:: python df = spark.read.options(**self.config.spark_read_options).format(self.config.spark_read_options['file_type']).load(files) .. note:: This makes ``spark_read_options.file_type`` a required property. The resulting spark read statement for SQL sources: .. code-block:: python df = spark.read.format("jdbc").options(**spark_read_options).load() Username and password ********************* We will no longer include usernames in the keyvault, but it has become a regular part of the yaml. The password refers to a secret from the keyvault, the username refers to a variable in the yaml or a fixed value, but never to a secret from the keyvault. Settings -------- .. code-block:: yaml settings: default_datatype: infer | string # How to handle datatypes in the `preprocess` stage defaults to ``infer`` type_escalation_mode: strict | loose # How to handle datatypes mismatches. Defaults to ``strict`` can_rewrite_history: True | False # Can we rewrite the history in bronze/silver layers to solve type conflicts? Defaults to false as it creates a lot of extra data in the `table_changes` table (Change data feed). datatypes: # To which datatypes the columns should be converted (takes prescendence over ``default_datatype`` and ``type_escalation_mode``) - col: column_name type: StringType | BinaryType | BooleanType | DateType | TimestampType | DecimalType | DecimalType(p,s) | DoubleType | FloatType | ByteType | IntegerType | LongType | ShortType .. silver_type: merge_and_delete | merge_only # defaults to merge_and_delete keys: # The columns for which to merge on in the silver layer - column_1 - column_2 - column ... delete_null_keys: False # Optional, should the platform remove rows where any of the keys are `Null` select: # OPTIONAL Specify only when only a subset of our columns is required in bronze. Does not effect loading time from source. Can break when source data changes. Selects all child items automatically. - '[column_1]' - '[column_...]' exclude: # OPTIONAL Specify which columns NOT to include (child columns for XML are also excluded automatically.) - '[column_1]' - '[column_...]' max_depth: int maskings: #Specify which columns should be masked. UDF can be one of the functions as specified in sdp.udf. - col: column_name udf: privacy_timestamp_udf | privacy_date_udf | privacy_string_sha1_udf | privacy_string_udf | privacy_int_udf | privacy_long_udf | privacy_double_udf | privacy_binary_udf | privacy_boolean_udf | privacy_hide_email_udf .. ignore_missing_columns: bool # If True, gives an error when column in SELECT is missing from dataframe. Defaults to False columns_to_lowercase: bool # Defaults to True. Changes all columns to lowercase spark_config: dict # Spark config (https://spark.apache.org/docs/latest/configuration.html) settings to apply while ingesting entity. Defaults to None Default Settings **************** Always fill in at least the settings default_datatype (default = infer), type_escalation_mode (default = strict), can_rewrite_history (default = False) and silver_type (default = merge_and_delete) even if the default value is used for this. Default Datatype **************** How to handle datatypes in the `preprocess` stage (before bronze/silver) for which no ``datatype`` has been specified (see below). ``infer`` uses inference to automatically determine the datatype. This might yield wrong results when the dataset is really small (small sample size). Therefore, ``string`` converts all columns to ``StringType`` for which no datatype has been specified. This is a safer option, but loses type information. Type Escalation mode ******************** The data coming out of the `preprocess` stage might contain different datatypes than already present in the bronze/silver layer. ``type_escalation_mode`` specifies **how** to handle these type conflicts. In short, ``strict`` always tries to cast the data to a `stricter` datatype. Example: StringType + IntegerType -> IntegerType. ``loose`` tries to cast to a more `loose` datatype. Example: StringType + IntegerType -> StringType. Note, `strict`-mode always falls back on `loose` mode when the conversion fails. ``can_rewrite_history`` (see next paragraph) specifies whether the current bronze/silver tables can be rewritten to solve a type conflict (this property defaults to False). Can Rewrite history ******************* Specifies whether it is allowed to rewrite the delta tables in bronze/silver when performing datatype escalation. Rewriting history rewrites every record and this is represented in the `Change Data Feed` a.k.a. `table_changes` table. Therefore, we should be careful in using this option. Datatypes ********* Sometimes you want manual control over the datatypes of the columns when reading data into the bronze/silver layer. The datatypes property allows you to specify a specific datatype for a column. The ingestion does *not* crash when the column specified is not available in the dataset. It throws a *warning* instead. Silver type *********** The `settings.silver_type` determines how the `silver` layer is filled. When `merge_and_delete` is enabled (default) then the keys that do no exists in the latest read dataset will be removed from the silver layer. When `merge_only` is set, those keys will *NOT* be removed from the silver layer (even if the latest data does not contain those keys) Delete Null Keys **************** Specifies whether the platform removes those records where any of the keys contain `NULL`'s. Might be usefull with nested ingests where you only want those records that actually contain the keys selected. Keys **** Specify a list of keys here. These keys are used in the silver layer to determine on which records to merge, update, insert and delete. You can specify the keys in the following formats: * 'column' * '[column]'' * column.nested_column * '[column].[nested_column]' .. note:: Keys are not case sensitive. Select ****** Sometimes you only want to save a subset of your dataset. Specifying select columns will only save those in bronze and silver. You can specify the select columns in the following formats: * 'column' * '[column]'' * column.nested_column * '[column].[nested_column]' .. note:: Select statements are not case sensitive. Exclude ******* Excluded columns will be excluded from the dataset. Note that `exclude` gets executed after the `select`. The syntax for specifying exclude columns is equal to that of the select statement. Max depth ********* Specify an integer here if you load a nested datastructure and only want to load a certain depth. Maskings ******** Specifies which masking functions should be applied on the columns. Can be useful for GDPR (NL: AVG) reasons. Ignore Missing columns ********************** Set this property to `True` when we do not want to ignore whether a column in the SELECT is not available in the given dataset. Excluded columns that are not available are ignored by default. Columns to Lowercase ******************** When set to True by default, the process will convert column names to lowercase. Mode ******************** Refers to the way Spark handles corrupt records. Default is PERMISSIVE. Options: PERMISSIVE: handles corrupted records and provides flexibility when processing data with different schemas. DROPMALFORMED: ignores the whole corrupted records FAILFAST: throws an exception when it meets corrupted records and stops the job Spark config ************ Set the Spark configuration during ingest. You can specify all the spark settings available `here `_. Entities -------- Entities can be seen as the list of `tables` to read. .. code-block:: yaml entities: - name: str depends_on: Optional[str|List[str]] # Entities are executed in parallel normally. However, when this field is filled the depended entities will be executed first. source: struct name: string # Should match any system.source.name config: Optional[struct] # OPTIONAL system.source.config is merged into this one settings: Optional[struct] # OPTIONAL system.ingest is merged into this one Each entity can reference its sources from the parent source-list. In order to match both sources, the ``entity.source.name`` should match ``source.name``. When two sources are matched, the YAML parser merges both `source` and `entity.source`, with the configuration on the entity taking precedence. This works equivalently for ``ingest`` configurations. Those configurations are also merged by the YAML parser with the configurations on the entity. Again, with the configuration of the entity taking precedence. How system and entities are merged ---------------------------------- Now, to show how the YAML parser merges the fields on the system with the entity, we take the following YAML input file: .. code-block:: yaml name: test_ingest_sdp comment: Comment here sources: - type: smb name: smb-source-pega config: username: ${BatchAccount-user} password: ${BatchAccount-pwd} auth_protocol: ntlm domain_controller: INS.LOCAL root: \\sdpfso.ins.local\DEV_InputData\Pega settings: keys: - _id1 entities: - name: test_entity_smb source: name: smb-source-pega config: smb_file_pattern: IL_Work_PolClosure_*.zip remove_source_files: False spark_read_options: file_type: XML multiline: True charset: UTF-8 rowTag: item settings: keys: - _id2 The ``sources`` are merged with the ``entity.source`` by the source name. Furthermore, the ``settings`` is merged with the ``entity.settings``. Thus, the YAML specification above is merged into the following specification: .. code-block:: yaml name: test_ingest_sdp comment: Comment here entities: - name: test_entity_smb source: type: smb name: smb-source-pega config: username: ${BatchAccount-user} password: ${BatchAccount-pwd} auth_protocol: ntlm domain_controller: INS.LOCAL root: \\sdpfso.ins.local\DEV_InputData\Pega name: smb-source-pega config: smb_file_pattern: IL_Work_PolClosure_*.zip remove_source_files: False spark_read_options: file_type: XML multiline: True charset: UTF-8 rowTag: item settings: keys: - _id1 - _id2 Using secrets ------------- Secrets are stored in the Azure Keyvault, which is maintained by the SDP Core team. To replace a piece of YAML with the content of a secret, you can use the following pattern: ``${{ secrets.secretname }}``. This allows for dynamically specifying the content of a secret anywhere in the YAML file. Secrets are replaced before any other actions are executed by the YAML parser. .. note:: Secrets are automatically fetched from the keyvault in the corresponding environment (dev, tst, acc, prd). For example the following YAML snippet: .. code-block:: yaml .. source: ... config: tenant_id: ${{ secrets.TenantId }} Is converted into: .. code-block:: yaml .. source: ... config: tenant_id: e6f53d0a-0004-4a2a-84f7-3c394c783b99 Using variables --------------- Variables allow to specify a different value in the YAML for each environment (dev, tst, acc, prd). The variable definition is placed at the `root` level of the YAML and should always contain a value for each environment. We define a variable in the following manner: .. code-block:: yaml .. variables: - name: server_name values: dev: 'develop.server.nl' tst: 'test.server.nl' acc: 'accept.server.nl' prd: 'prod.server.nl' .. Now we can use the variable anywhere in the code: .. code-block:: yaml name: test_system_sdp comment: 'Ingest for ${{ variables.server_name }}' max_workers: 8 .. Using he exclude option ----------------------- If not all columns need to be read from a XML file, you can use the exclude option instead of indicate the specific columns in the YAML. For example: .. code-block:: yaml .. entities: - name: source: settings: exclude: column_name .. All-in-one demo --------------- The following example combines the theory above into an example ingestion. It includes several sources and entities. .. literalinclude:: ../../../../sdp-ingest/examples/demo_ingest.yaml :language: yaml