Wednesday, August 20, 2025

Azure Data Factory Metadata-Driven Pipelines - 2

 As mentioned in the previous post, we're using some custom tables behind our metadata-driven pipeline to provide some flexibility and hopefully allow it to be expandable. The schema is still a work in progress, and has some obvious limitations that could be improved on once we have some more time available.

Overview 

High-level, the solution has some tables to hold the entities/tables that are to be loaded, along with column mappings, "high water" values for delta loads, and any pre-copy scripts to be run. Here's the ERD.

 

TablePurpose
DataFeedHigh level data feed details
DataFeedEntityTables to load
ColumnName    Column names to cut down duplication in table rows
DataFeedColumn    Source and destination columns
DataFeedEntityScript    The pre-copy script to run if required
HighWaterValue    Column and value used for delta loads


Most of the tables have an "Active" column which provides flexibility of which entities and columns to include in the ADF copy activities.

On top of the tables is a view which is used to present the table data to the ADF pipeline. In theory it should be possible to have multiple views for different data loads, but we haven't tested this yet.The easiest way to create the view is to use the table that the ADF wizard creates then recreate the table output in the view. I'll include our current version below for reference. There's also a stored proc to update the high water values, and a table valued function which gets a count of active entities to be loaded.

Limitations

There're a few baked in limitations in our solution which were design choices based on our loads and keeping it simple (ish) to start with. These should all be easy to adapt and we'll probably look at that in the down time. The key limitations are:

  • we assume the destination table has the same name as the source entity
  • also assume that the target schema is dbo. This was a bit of laziness and we'll probably add a destination schema column to the DataFeedColumn table in the near future
  • the data type columns in DataFeedColumn are the ADF data types, e.g. String rather than varchar. We have a separate mapping table that we use when populating the table, but this could be added to the schema 
  • We don't store connections in these tables, which could be a useful enhancement 

Example SQL View

SELECT CONCAT (
            N'{
            "entityName": "',
            dfe.DataFeedEntityName,
            N'"
        }'
            ) AS SourceObjectSettings,

        NULL AS [SourceConnectionSettingsName],

        NULL AS [CopySourceSettings],

        CONCAT (
            N'{
            "schema": "dbo",
            "table": "',
            dfe.DataFeedEntityName,
            N'"
        }'
            ) AS[SinkObjectSettings],

        NULL AS [SinkConnectionSettingsName],

        concat(N'{
            "preCopyScript": ', ISNULL(QUOTENAME(pre.ScriptBody, '"'), 'null') ,
            ',
            "tableOption": null,
            "writeBehavior": ', CASE WHEN dfe.LoadType = 'FullLoad' THEN '"insert"' ELSE '"upsert"' END, ',
            "sqlWriterUseTableLock": true,
            "disableMetricsCollection": false,
            "upsertSettings": {
                "useTempDB": true,
                "keys": [
                    "', keycols.KeyColumnName, N'"
                ]
            }') AS [CopySinkSettings],

        REPLACE(CAST(N'{
            "translator": {
                "type": "TabularTranslator", 
                "mappings": [{X}]
             }
          }' AS NVARCHAR(max)), N'{X}', ca.X) AS [CopyActivitySettings],

        N'MetadataDrivenCopyTask_ftq_TopLevel' AS [TopLevelPipelineName],

        N'[
            "Sandbox",
            "Manual"
        ]' AS [TriggerName],

        CONCAT (
            N'{
            "dataLoadingBehavior": "',
            dfe.LoadType,
            N'",',
            N'"watermarkColumnName": "',
            cm.ColumnName,
            N'",',
            N'"watermarkColumnType": "DateTime",',
            N'"watermarkColumnStartValue": "',
            convert(VARCHAR(40), hv.TimestampValue, 126),
            N'"',
            N'}'
            ) AS [DataLoadingBehaviorSettings],

        dfe.EntityGroup as [TaskId],

        dfe.Active [CopyEnabled],

        ROW_NUMBER() OVER(ORDER BY dfe.[EntityGroup], dfe.[DataFeedEntityId] DESC) AS RowNumber,

        dfe.DataFeedEntityId

    FROM adf.DataFeedEntity dfe
    JOIN (
        SELECT c.DataFeedEntityId,
            STRING_AGG(N'{"source":{"name":"' + cast(c.[SourceColumnName] AS NVARCHAR(max)) + N'",
            "type": "' + c.SourceDataType + '"},"sink":{"name":"' + cast(c.[DestinationColumnName] AS NVARCHAR(max)) + '"}}', ',') X
        FROM adf.vw_ColumnMapping c
        GROUP BY DataFeedEntityId
        ) ca ON ca.DataFeedEntityId = dfe.DataFeedEntityId
    LEFT JOIN adf.HighWaterValue hv ON dfe.DataFeedEntityId = hv.DataFeedEntityId
    LEFT JOIN adf.ColumnName cm ON hv.ColumnNameId = cm.ColumnNameId
    LEFT JOIN (
        SELECT  c.ColumnName as KeyColumnName, 
                dfc.DataFeedEntityId
        FROM [adf].[DataFeedColumn] dfc
        JOIN [adf].[ColumnName] c ON dfc.DestinationColumnNameId = c.ColumnNameId
        WHERE dfc.IsKey = 1
    ) keycols ON dfe.DataFeedEntityId = keycols.DataFeedEntityId
    LEFT JOIN adf.DataFeedEntityScript pre ON dfe.DataFeedEntityId = pre.DataFeedEntityId
        AND pre.ScriptType = 'PreCopy'
    WHERE dfe.Active = 1 

 

 

Part 1: Azure Data Factory Metadata-Driven Pipelines - 1

Local Testing an Azure Function with a Timer Trigger

Following on from the last post on   Azure Function Deployed But Not Visible , when investigating this I wanted a way to test the function w...