As mentioned in the previous post, we're using some custom tables behind our metadata-driven pipeline to provide some flexibility and hopefully allow it to be expandable. The schema is still a work in progress, and has some obvious limitations that could be improved on once we have some more time available.
Overview
High-level, the solution has some tables to hold the entities/tables that are to be loaded, along with column mappings, "high water" values for delta loads, and any pre-copy scripts to be run. Here's the ERD.
| Table | Purpose |
|---|---|
| DataFeed | High level data feed details |
| DataFeedEntity | Tables to load |
| ColumnName | Column names to cut down duplication in table rows |
| DataFeedColumn | Source and destination columns |
| DataFeedEntityScript | The pre-copy script to run if required |
| HighWaterValue | Column and value used for delta loads |
Most of the tables have an "Active" column which provides flexibility of which entities and columns to include in the ADF copy activities.
On top of the tables is a view which is used to present the table data to the ADF pipeline. In theory it should be possible to have multiple views for different data loads, but we haven't tested this yet.The easiest way to create the view is to use the table that the ADF wizard creates then recreate the table output in the view. I'll include our current version below for reference. There's also a stored proc to update the high water values, and a table valued function which gets a count of active entities to be loaded.
Limitations
There're a few baked in limitations in our solution which were design choices based on our loads and keeping it simple (ish) to start with. These should all be easy to adapt and we'll probably look at that in the down time. The key limitations are:
- we assume the destination table has the same name as the source entity
- also assume that the target schema is dbo. This was a bit of laziness and we'll probably add a destination schema column to the DataFeedColumn table in the near future
- the data type columns in DataFeedColumn are the ADF data types, e.g. String rather than varchar. We have a separate mapping table that we use when populating the table, but this could be added to the schema
- We don't store connections in these tables, which could be a useful enhancement
Example SQL View
SELECT CONCAT (
N'{
"entityName": "',
dfe.DataFeedEntityName,
N'"
}'
) AS SourceObjectSettings,
NULL AS [SourceConnectionSettingsName],
NULL AS [CopySourceSettings],
CONCAT (
N'{
"schema": "dbo",
"table": "',
dfe.DataFeedEntityName,
N'"
}'
) AS[SinkObjectSettings],
NULL AS [SinkConnectionSettingsName],
concat(N'{
"preCopyScript": ', ISNULL(QUOTENAME(pre.ScriptBody, '"'), 'null') ,
',
"tableOption": null,
"writeBehavior": ', CASE WHEN dfe.LoadType = 'FullLoad' THEN '"insert"' ELSE '"upsert"' END, ',
"sqlWriterUseTableLock": true,
"disableMetricsCollection": false,
"upsertSettings": {
"useTempDB": true,
"keys": [
"', keycols.KeyColumnName, N'"
]
}') AS [CopySinkSettings],
REPLACE(CAST(N'{
"translator": {
"type": "TabularTranslator",
"mappings": [{X}]
}
}' AS NVARCHAR(max)), N'{X}', ca.X) AS [CopyActivitySettings],
N'MetadataDrivenCopyTask_ftq_TopLevel' AS [TopLevelPipelineName],
N'[
"Sandbox",
"Manual"
]' AS [TriggerName],
CONCAT (
N'{
"dataLoadingBehavior": "',
dfe.LoadType,
N'",',
N'"watermarkColumnName": "',
cm.ColumnName,
N'",',
N'"watermarkColumnType": "DateTime",',
N'"watermarkColumnStartValue": "',
convert(VARCHAR(40), hv.TimestampValue, 126),
N'"',
N'}'
) AS [DataLoadingBehaviorSettings],
dfe.EntityGroup as [TaskId],
dfe.Active [CopyEnabled],
ROW_NUMBER() OVER(ORDER BY dfe.[EntityGroup], dfe.[DataFeedEntityId] DESC) AS RowNumber,
dfe.DataFeedEntityId
FROM adf.DataFeedEntity dfe
JOIN (
SELECT c.DataFeedEntityId,
STRING_AGG(N'{"source":{"name":"' + cast(c.[SourceColumnName] AS NVARCHAR(max)) + N'",
"type": "' + c.SourceDataType + '"},"sink":{"name":"' + cast(c.[DestinationColumnName] AS NVARCHAR(max)) + '"}}', ',') X
FROM adf.vw_ColumnMapping c
GROUP BY DataFeedEntityId
) ca ON ca.DataFeedEntityId = dfe.DataFeedEntityId
LEFT JOIN adf.HighWaterValue hv ON dfe.DataFeedEntityId = hv.DataFeedEntityId
LEFT JOIN adf.ColumnName cm ON hv.ColumnNameId = cm.ColumnNameId
LEFT JOIN (
SELECT c.ColumnName as KeyColumnName,
dfc.DataFeedEntityId
FROM [adf].[DataFeedColumn] dfc
JOIN [adf].[ColumnName] c ON dfc.DestinationColumnNameId = c.ColumnNameId
WHERE dfc.IsKey = 1
) keycols ON dfe.DataFeedEntityId = keycols.DataFeedEntityId
LEFT JOIN adf.DataFeedEntityScript pre ON dfe.DataFeedEntityId = pre.DataFeedEntityId
AND pre.ScriptType = 'PreCopy'
WHERE dfe.Active = 1