ADF V2 - параметризация конвейера копирования данных на основе столбца таблицы - PullRequest
0 голосов
/ 18 сентября 2018

с помощью Azure Data Factory V2 через портал

https://adf.azure.com

Я создал конвейер для инкрементного копирования данных из нескольких таблиц из одного Azure SQLбазы данных в другую базу данных SQL Azure.

Для его создания я адаптировал следующий пример к своим потребностям: Инкрементная загрузка данных из нескольких таблиц

Ниже приведен файл json, связанный с созданным конвейером:

{
"name": "IncrementalCopyPipeline",
"properties": {
    "activities": [
        {
            "name": "IterateSQLTables",
            "type": "ForEach",
            "typeProperties": {
                "items": {
                    "value": "@pipeline().parameters.tableList",
                    "type": "Expression"
                },
                "activities": [
                    {
                        "name": "LookupOldWaterMarkActivity",
                        "type": "Lookup",
                        "policy": {
                            "timeout": "7.00:00:00",
                            "retry": 0,
                            "retryIntervalInSeconds": 30,
                            "secureOutput": false,
                            "secureInput": false
                        },
                        "typeProperties": {
                            "source": {
                                "type": "SqlSource",
                                "sqlReaderQuery": {
                                    "value": "select * \nfrom watermarktable \nwhere TableName  =  '@{item().TABLE_NAME}'",
                                    "type": "Expression"
                                }
                            },
                            "dataset": {
                                "referenceName": "WatermarkDataset",
                                "type": "DatasetReference"
                            }
                        }
                    },
                    {
                        "name": "LookupNewWaterMarkActivity",
                        "type": "Lookup",
                        "policy": {
                            "timeout": "7.00:00:00",
                            "retry": 0,
                            "retryIntervalInSeconds": 30,
                            "secureOutput": false,
                            "secureInput": false
                        },
                        "typeProperties": {
                            "source": {
                                "type": "SqlSource",
                                "sqlReaderQuery": {
                                    "value": "select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue \nfrom @{item().TABLE_NAME}",
                                    "type": "Expression"
                                }
                            },
                            "dataset": {
                                "referenceName": "SourceDataset",
                                "type": "DatasetReference"
                            }
                        }
                    },
                    {
                        "name": "IncrementalCopyActivity",
                        "type": "Copy",
                        "dependsOn": [
                            {
                                "activity": "LookupNewWaterMarkActivity",
                                "dependencyConditions": [
                                    "Succeeded"
                                ]
                            },
                            {
                                "activity": "LookupOldWaterMarkActivity",
                                "dependencyConditions": [
                                    "Succeeded"
                                ]
                            }
                        ],
                        "policy": {
                            "timeout": "7.00:00:00",
                            "retry": 0,
                            "retryIntervalInSeconds": 30,
                            "secureOutput": false,
                            "secureInput": false
                        },
                        "typeProperties": {
                            "source": {
                                "type": "SqlSource",
                                "sqlReaderQuery": {
                                    "value": "select * from @{item().TABLE_NAME} \nwhere @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                    "type": "Expression"
                                }
                            },
                            "sink": {
                                "type": "SqlSink",
                                "writeBatchSize": 10000,
                                "sqlWriterStoredProcedureName": {
                                    "value": "@{item().StoredProcedureNameForMergeOperation}",
                                    "type": "Expression"
                                },
                                "sqlWriterTableType": {
                                    "value": "@{item().TableType}",
                                    "type": "Expression"
                                }
                            },
                            "enableStaging": false,
                            "dataIntegrationUnits": 0
                        },
                        "inputs": [
                            {
                                "referenceName": "SourceDataset",
                                "type": "DatasetReference"
                            }
                        ],
                        "outputs": [
                            {
                                "referenceName": "SinkDataset",
                                "type": "DatasetReference",
                                "parameters": {
                                    "SinkTableName": "@{item().TABLE_NAME}"
                                }
                            }
                        ]
                    },
                    {
                        "name": "StoredProceduretoWriteWatermarkActivity",
                        "type": "SqlServerStoredProcedure",
                        "dependsOn": [
                            {
                                "activity": "IncrementalCopyActivity",
                                "dependencyConditions": [
                                    "Succeeded"
                                ]
                            }
                        ],
                        "policy": {
                            "timeout": "7.00:00:00",
                            "retry": 0,
                            "retryIntervalInSeconds": 30,
                            "secureOutput": false,
                            "secureInput": false
                        },
                        "typeProperties": {
                            "storedProcedureName": "[dbo].[sp_write_watermark]",
                            "storedProcedureParameters": {
                                "LastModifiedtime": {
                                    "value": {
                                        "value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                        "type": "Expression"
                                    },
                                    "type": "DateTime"
                                },
                                "TableName": {
                                    "value": {
                                        "value": "@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                        "type": "Expression"
                                    },
                                    "type": "String"
                                }
                            }
                        },
                        "linkedServiceName": {
                            "referenceName": "SqlServerLinkedService_dest",
                            "type": "LinkedServiceReference"
                        }
                    }
                ]
            }
        }
    ],
    "parameters": {
        "tableList": {
            "type": "Object",
            "defaultValue": [
                {
                    "TABLE_NAME": "customer_table",
                    "WaterMark_Column": "LastModifytime",
                    "TableType": "DataTypeforCustomerTable",
                    "StoredProcedureNameForMergeOperation": "sp_upsert_customer_table"
                },
                {
                    "TABLE_NAME": "project_table",
                    "WaterMark_Column": "Creationtime",
                    "TableType": "DataTypeforProjectTable",
                    "StoredProcedureNameForMergeOperation": "sp_upsert_project_table"
                }
            ]
        }
    }
}
}

В моей таблице есть столбец, который различает разные компании, и поэтому я хотел бы добавить еще один параметр в этот конвейер.У меня есть такая таблица:

NAME    LASTMODIFY                 COMPANY
John    2015-01-01 00:00:00.000    1
Mike    2016-02-02 01:23:00.000    2
Andy    2017-03-04 05:16:00.000    3
Annie   2018-09-08 00:00:00.000    1

Кто-то знает, как вставить параметр в конвейер, чтобы указать, какую компанию копировать, а какую не копировать?

Есть ли какие-либопредложение?Спасибо всем заранее!

1 Ответ

0 голосов
/ 04 января 2019

Не совсем понятно, о чем вы спрашиваете, поэтому извиняюсь, если я пропускаю отметку, но:

Копирование допускает хранимую процедуру, которую вы можете использовать для потенциального решения вашей проблемы. Взгляните на этот пример: https://docs.microsoft.com/en-us/azure/data-factory/connector-sql-server#invoking-stored-procedure-for-sql-sink

Он использует хранимую процедуру, чтобы MERGE выполнить UPDATE или INSERT, в зависимости от соответствия JOIN. Это также позволяет передавать параметры.

Так что, если вы пытаетесь скопировать только определенные случаи на основе параметра, объединение MERGE может помочь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...