YAML
This section is a reference guide to the SDP Ingestion YAML schema. It states all the possible YAML tags and options available.
An ingest YAML can be specified either as a seperate YAML file or using the %ingest_run cell magic from a notebook by first using import sdp.ingest.
In order to use the magics, one should import the ingest package:
import sdp.ingest
Now you can use the specify the ingestion YAML inside a new notebook cell:
%%ingest_run
# YAML HERE
Root
The YAML describes an ingest a group of entities (equivalent to tables). An ingestion pipeline can only contain a single system. Each system contains one or more data sources as denoted by the sources property.
name: string # Name of the system
max_workers: int #Optional the number of parallel entity ingests, defaults to 8
comment: string # OPTIONAL
sources:
- source 1
- source 2
- source ..
entities:
- entity 1
- entity 2
- entity ..
Sources
A source contains information on how to connect to a data source. We give the source a name to be able to reference to it later from the entity. Currently the framework supports three types: mssql, smb and staging.
...
sources:
- name: string
type: mssql | smb | staging | azure-files | gbo
config: struct
...
Each source has its own distinct configuration structure.
MSSQL Source
This source reads data from a Microsoft SQL database. Its configuration is specified as sdp.ingest.sources.mssql.SqlSourceConfig.
source:
name: string
type: mssql
config:
server: string
database: string
auth_type: windows | sql | ad_sp | az_password
username: string # Username for windows, sql and az_password, client_id for ad_sp
password: string # Password for windows, sql and az_password, client_secret for ad_sp
spark_read_options: dict
# some examples that can be used here:
dbtable: 'schema.tablename'
query: str
partitionColumn: str
lowerBound: str
upperBound: str
numPartitions: str
fetchsize: int
option...: value..
SMB Source
This source reads data from a SMB file share. The configuration of this source is described in sdp.ingest.sources.smb.SMBSourceConfig.
source:
name: string
type: smb
config:
username: string
password: string
auth_protocol: NTLM
domain_controller: string
root: string
smb_file_pattern: string
adls_file_pattern: string # OPTIONAL defaults to *.
adls_file_anti_pattern: string # Optional, default None
remove_source_files: string # OPTIONAL defaults to False
zip_password: string # OPTIONAL defaults to None, if the file is a zip file and requires a password. Only one password can be specific per source and will use this password for every zip in that folder.
raise_no_files_found_exception: bool # OPTIONAL defaults to False, if True raises an exception when no files are found
spark_read_options:
file_type: string # Required
option_1: value_1 # Optional
option_2: value_2 # Optional
option_n: value_n # Optional
Azure Files Source
This source reads data from a Azure File share using a service principal. The configuration of this source is described in sdp.ingest.sources.azure_files.AzureFilesSourceConfig. To ingest data, you need the role Storage File Data Privileged Contributor.
source:
name: string
type: azure-files
config:
dir_path: string # E.g. https://nubulosdpdlsdev01.file.core.windows.net/test-mike
client_id: string
client_secret: string
tenant_id: string # Optional, defaults to SDP__CORE__TENANT_ID enviroment variable
azure_file_pattern: string # REQUIRED Glob pattern to filter files in the fileshare. Defaults to *
adls_file_pattern: string # OPTIONAL Glob pattern to filter files once they arrived in staging defaults to *.
adls_file_anti_pattern: string # Optional, default None
remove_source_files: string # OPTIONAL defaults to False
zip_password: string # OPTIONAL defaults to None, if the file is a zip file and requires a password. Only one password can be specific per source and will use this password for every zip in that folder.
raise_no_files_found_exception: bool # OPTIONAL defaults to False, if True raises an exception when no files are found
spark_read_options:
file_type: string # Required
option_1: value_1 # Optional
option_2: value_2 # Optional
option_n: value_n # Optional
GBO Source
This source reads data from a GBO .txt files from a file share. The platform assumes that the each metadata file (with _META_ in the filename) can be matched with a data file. This data file must have the exactly same filename as the metadata file, only with the _META_ removed from the filename.
source:
name: string
type: gbo
config:
username: string
password: string
auth_protocol: NTLM
domain_controller: string
root: string
smb_file_pattern: string
adls_file_pattern: string # OPTIONAL defaults to *.
adls_file_anti_pattern: string # Optional, default None
remove_source_files: string # OPTIONAL defaults to False
metadata_dump_location: str # The location to which the metadata should be dumped. Defaults to 'bronze/gbo_metadata'
The metadata_dump_location is the location where the metadata from the metadata file (__META__) is dumped as JSON. This metadata dump can be used to validate the ingested GBO data against in the future. You normally do not modify this location. Each filepair generates a single row in this metadata location with an unique ID (UUID). The platform adds two columns to the ingested data: __gbo_metadata_location (pointing to the dumped metadata) and __gbo_metadata_id (pointing to the correct row of the dumped metadata). These two columns can be used to match (join) the ingested data with the GBO metadata.
Staging Source
This source reads data from the staging folder from the sdp container in ADLS Gen2. The corresponding Python object is documented in sdp.sources.smb.StagingSourceConfig.
source:
name: string
type: staging
config:
staging_path: string # OPTIONAL
adls_file_pattern: string # OPTIONAL
adls_file_anti_pattern: string #OPTIONAL
zip_password: string # OPTIONAL defaults to None, if the file is a zip file and requires a password. Only one password can be specific per source and will use this password for every zip in that folder.
raise_no_files_found_exception: bool # OPTIONAL defaults to False, if True raises an exception when no files are found
spark_read_options: struct
file_type: string # Required
option_1: value_1 # Optional
option_2: value_2 # Optional
option_n: value_n # Optional
Spark Read Options
Any source can specify additional spark_read_options under source.config. These options get passed directly into spark.read. This makes the YAML code very flexible, as it allows for any generic option to be specified. Consult the spark documentation for all available options for each format.
For files this get used as:
df = spark.read.options(**self.config.spark_read_options).format(self.config.spark_read_options['file_type']).load(files)
Note
This makes spark_read_options.file_type a required property.
The resulting spark read statement for SQL sources:
df = spark.read.format("jdbc").options(**spark_read_options).load()
Username and password
We will no longer include usernames in the keyvault, but it has become a regular part of the yaml. The password refers to a secret from the keyvault, the username refers to a variable in the yaml or a fixed value, but never to a secret from the keyvault.
Settings
settings:
default_datatype: infer | string # How to handle datatypes in the `preprocess` stage defaults to ``infer``
type_escalation_mode: strict | loose # How to handle datatypes mismatches. Defaults to ``strict``
can_rewrite_history: True | False # Can we rewrite the history in bronze/silver layers to solve type conflicts? Defaults to false as it creates a lot of extra data in the `table_changes` table (Change data feed).
datatypes: # To which datatypes the columns should be converted (takes prescendence over ``default_datatype`` and ``type_escalation_mode``)
- col: column_name
type: StringType | BinaryType | BooleanType | DateType | TimestampType | DecimalType | DecimalType(p,s) | DoubleType | FloatType | ByteType | IntegerType | LongType | ShortType
..
silver_type: merge_and_delete | merge_only # defaults to merge_and_delete
keys: # The columns for which to merge on in the silver layer
- column_1
- column_2
- column ...
delete_null_keys: False # Optional, should the platform remove rows where any of the keys are `Null`
select: # OPTIONAL Specify only when only a subset of our columns is required in bronze. Does not effect loading time from source. Can break when source data changes. Selects all child items automatically.
- '[column_1]'
- '[column_...]'
exclude: # OPTIONAL Specify which columns NOT to include (child columns for XML are also excluded automatically.)
- '[column_1]'
- '[column_...]'
max_depth: int
maskings: #Specify which columns should be masked. UDF can be one of the functions as specified in sdp.udf.
- col: column_name
udf: privacy_timestamp_udf | privacy_date_udf | privacy_string_sha1_udf | privacy_string_udf | privacy_int_udf | privacy_long_udf | privacy_double_udf | privacy_binary_udf | privacy_boolean_udf | privacy_hide_email_udf
..
ignore_missing_columns: bool # If True, gives an error when column in SELECT is missing from dataframe. Defaults to False
columns_to_lowercase: bool # Defaults to True. Changes all columns to lowercase
spark_config: dict # Spark config (https://spark.apache.org/docs/latest/configuration.html) settings to apply while ingesting entity. Defaults to None
Default Settings
Always fill in at least the settings default_datatype (default = infer), type_escalation_mode (default = strict), can_rewrite_history (default = False) and silver_type (default = merge_and_delete) even if the default value is used for this.
Default Datatype
How to handle datatypes in the preprocess stage (before bronze/silver) for which no datatype has been specified (see below).
infer uses inference to automatically determine the datatype. This might yield wrong results when the dataset is really small (small sample size).
Therefore, string converts all columns to StringType for which no datatype has been specified. This is a safer option, but loses type information.
Type Escalation mode
The data coming out of the preprocess stage might contain different datatypes than already present in the bronze/silver layer.
type_escalation_mode specifies how to handle these type conflicts. In short, strict always tries to cast the data to a stricter datatype. Example: StringType + IntegerType -> IntegerType.
loose tries to cast to a more loose datatype. Example: StringType + IntegerType -> StringType. Note, strict-mode always falls back on loose mode when the conversion fails.
can_rewrite_history (see next paragraph) specifies whether the current bronze/silver tables can be rewritten to solve a type conflict (this property defaults to False).
Can Rewrite history
Specifies whether it is allowed to rewrite the delta tables in bronze/silver when performing datatype escalation. Rewriting history rewrites every record and this is represented in the Change Data Feed a.k.a. table_changes table. Therefore, we should be careful in using this option.
Datatypes
Sometimes you want manual control over the datatypes of the columns when reading data into the bronze/silver layer. The datatypes property allows you to specify a specific datatype for a column. The ingestion does not crash when the column specified is not available in the dataset. It throws a warning instead.
Silver type
The settings.silver_type determines how the silver layer is filled. When merge_and_delete is enabled (default) then the keys that do no exists in the latest read dataset will be removed from the silver layer. When merge_only is set, those keys will NOT be removed from the silver layer (even if the latest data does not contain those keys)
Delete Null Keys
Specifies whether the platform removes those records where any of the keys contain NULL’s. Might be usefull with nested ingests where you only want those records that actually contain the keys selected.
Keys
Specify a list of keys here. These keys are used in the silver layer to determine on which records to merge, update, insert and delete. You can specify the keys in the following formats:
‘column’
‘[column]’’
column.nested_column
‘[column].[nested_column]’
Note
Keys are not case sensitive.
Select
Sometimes you only want to save a subset of your dataset. Specifying select columns will only save those in bronze and silver.
You can specify the select columns in the following formats:
‘column’
‘[column]’’
column.nested_column
‘[column].[nested_column]’
Note
Select statements are not case sensitive.
Exclude
Excluded columns will be excluded from the dataset. Note that exclude gets executed after the select. The syntax for specifying exclude columns is equal to that of the select statement.
Max depth
Specify an integer here if you load a nested datastructure and only want to load a certain depth.
Maskings
Specifies which masking functions should be applied on the columns. Can be useful for GDPR (NL: AVG) reasons.
Ignore Missing columns
Set this property to True when we do not want to ignore whether a column in the SELECT is not available in the given dataset. Excluded columns that are not available are ignored by default.
Columns to Lowercase
When set to True by default, the process will convert column names to lowercase.
Mode
Refers to the way Spark handles corrupt records. Default is PERMISSIVE.
Options: PERMISSIVE: handles corrupted records and provides flexibility when processing data with different schemas. DROPMALFORMED: ignores the whole corrupted records FAILFAST: throws an exception when it meets corrupted records and stops the job
Spark config
Set the Spark configuration during ingest. You can specify all the spark settings available here.
Entities
Entities can be seen as the list of tables to read.
entities:
- name: str
depends_on: Optional[str|List[str]] # Entities are executed in parallel normally. However, when this field is filled the depended entities will be executed first.
source: struct
name: string # Should match any system.source.name
config: Optional[struct] # OPTIONAL system.source.config is merged into this one
settings: Optional[struct] # OPTIONAL system.ingest is merged into this one
Each entity can reference its sources from the parent source-list. In order to match both sources, the entity.source.name should match source.name. When two sources are matched, the YAML parser merges both source and entity.source, with the configuration on the entity taking precedence.
This works equivalently for ingest configurations. Those configurations are also merged by the YAML parser with the configurations on the entity. Again, with the configuration of the entity taking precedence.
How system and entities are merged
Now, to show how the YAML parser merges the fields on the system with the entity, we take the following YAML input file:
name: test_ingest_sdp
comment: Comment here
sources:
- type: smb
name: smb-source-pega
config:
username: ${BatchAccount-user}
password: ${BatchAccount-pwd}
auth_protocol: ntlm
domain_controller: INS.LOCAL
root: \\sdpfso.ins.local\DEV_InputData\Pega
settings:
keys:
- _id1
entities:
- name: test_entity_smb
source:
name: smb-source-pega
config:
smb_file_pattern: IL_Work_PolClosure_*.zip
remove_source_files: False
spark_read_options:
file_type: XML
multiline: True
charset: UTF-8
rowTag: item
settings:
keys:
- _id2
The sources are merged with the entity.source by the source name.
Furthermore, the settings is merged with the entity.settings.
Thus, the YAML specification above is merged into the following specification:
name: test_ingest_sdp
comment: Comment here
entities:
- name: test_entity_smb
source:
type: smb
name: smb-source-pega
config:
username: ${BatchAccount-user}
password: ${BatchAccount-pwd}
auth_protocol: ntlm
domain_controller: INS.LOCAL
root: \\sdpfso.ins.local\DEV_InputData\Pega
name: smb-source-pega
config:
smb_file_pattern: IL_Work_PolClosure_*.zip
remove_source_files: False
spark_read_options:
file_type: XML
multiline: True
charset: UTF-8
rowTag: item
settings:
keys:
- _id1
- _id2
Using secrets
Secrets are stored in the Azure Keyvault, which is maintained by the SDP Core team.
To replace a piece of YAML with the content of a secret, you can use the following pattern: ${{ secrets.secretname }}.
This allows for dynamically specifying the content of a secret anywhere in the YAML file.
Secrets are replaced before any other actions are executed by the YAML parser.
Note
Secrets are automatically fetched from the keyvault in the corresponding environment (dev, tst, acc, prd).
For example the following YAML snippet:
..
source:
...
config:
tenant_id: ${{ secrets.TenantId }}
Is converted into:
..
source:
...
config:
tenant_id: e6f53d0a-0004-4a2a-84f7-3c394c783b99
Using variables
Variables allow to specify a different value in the YAML for each environment (dev, tst, acc, prd). The variable definition is placed at the root level of the YAML and should always contain a value for each environment.
We define a variable in the following manner:
..
variables:
- name: server_name
values:
dev: 'develop.server.nl'
tst: 'test.server.nl'
acc: 'accept.server.nl'
prd: 'prod.server.nl'
..
Now we can use the variable anywhere in the code:
name: test_system_sdp
comment: 'Ingest for ${{ variables.server_name }}'
max_workers: 8
..
Using he exclude option
If not all columns need to be read from a XML file, you can use the exclude option instead of indicate the specific columns in the YAML.
For example:
..
entities:
- name:
source:
settings:
exclude: column_name
..
All-in-one demo
The following example combines the theory above into an example ingestion. It includes several sources and entities.