Итак, у нас есть фабрика с ~ 400 наборами данных и ~ 200 конвейерами, и она становится громоздкой. Сосредоточение на копировании из исходного кода SQL в приемник BLOB-объектов. Поскольку мы копируем в BLOB-объект, схема не оказывает влияния. Я хотел бы иметь один набор данных для каждого источника, один набор данных для каждой учетной записи BLOB-объекта и один конвейер для каждой комбинации учетной записи источника / BLOB-объекта, динамически передавая ему конфигурацию из поиска.
Мы успешно разработали конвейер, который использует фиктивные наборы данных для источника и приемника. Работает, если вы отправите запрос, имя контейнера и имя папки.
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "DynamicCopy",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "select 1 a"
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "AzureSql",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob",
"type": "DatasetReference",
"parameters": {
"container": "raw-test",
"folder": "test"
}
}
]
}
]
}
}
Когда мы помещаем поиск перед ним и оборачиваем его в foreach, он перестает работать. С не очень полезным
"errorCode": "400",
"message": "Activity failed because an inner activity failed",
"failureType": "UserError",
"target": "ForEach"
Хранимая процедура поиска [dbo].[adfdynamic]
на самом деле еще не упоминается в foreach:
create proc adfdynamic as
select 'raw-test' container, 'test_a' folder, 'select 1 a, 2 b'
UNION ALL
select 'raw-test' container, 'test_b' folder, 'select 3 c, 2 d'
Итак, что я желал, это поведение:
- один BLOB-объект в raw-test @ .. myblob ... / test_a / out.dsv с содержимым
{'a,b','1,2'}
- один BLOB-объект в raw-test @ .. myblob ... / test_b / out.dsv с содержимым
{'c,d','3,2'}
набор данных sql:
{
"name": "AzureSql",
"properties": {
"linkedServiceName": {
"referenceName": "Dest",
"type": "LinkedServiceReference"
},
"type": "AzureSqlTable",
"structure": [
{
"name": "CustomerKey",
"type": "Int32"
},
{
"name": "Name",
"type": "String"
}
],
"typeProperties": {
"tableName": "[dbo].[DimCustomer]"
}
}
}
набор данных BLOB-объектов:
{
"name": "AzureBlob",
"properties": {
"linkedServiceName": {
"referenceName": "AzureStorage1",
"type": "LinkedServiceReference"
},
"parameters": {
"container": {
"type": "String"
},
"folder": {
"type": "String"
}
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "TextFormat",
"columnDelimiter": ",",
"treatEmptyAsNull": false,
"skipLineCount": 0,
"firstRowAsHeader": false
},
"fileName": {
"value": "@{dataset().folder}/out.dsv",
"type": "Expression"
},
"folderPath": {
"value": "@dataset().container",
"type": "Expression"
}
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
и нерабочий динамический конвейер:
{
"name": "Copy",
"properties": {
"activities": [
{
"name": "ForEach",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('Lookup').output.value",
"type": "Expression"
},
"activities": [
{
"name": "Copy",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": {
"value": "select 1 a, 2 b from dest",
"type": "Expression"
}
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "AzureSql",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob",
"type": "DatasetReference",
"parameters": {
"container": {
"value": "raw-test",
"type": "Expression"
},
"folder": {
"value": "folder",
"type": "Expression"
}
}
}
]
}
]
}
},
{
"name": "Lookup",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
}
}
]
}
}
Извинения за форматирование. слишком много кода в одном сообщении?