У меня есть конвейер в DF2, который должен просматривать папку в BLOB-объекте и последовательно обрабатывать каждый из 145 файлов в таблице базы данных.После загрузки каждого файла в таблицу должна быть запущена хранимая процедура, которая проверит каждую запись и либо вставит ее, либо обновит существующую запись в основной таблице.
Глядя в онлайн, я чувствую, что пробовал каждую комбинацию активаций «Получить метаданные», «Для каждого», «LookUp» и «Присвоить переменную», но по какой-то причине мои данные копирования все равно выбираютсявсе файлы одновременно и работает 145 раз.
Недавно в Интернете нашел блог, за которым я следовал, чтобы использовать «Назначить переменную», так как он будет полезен для нескольких местоположений файлов, но он не работает для меня.Мне нужно читать файлы как таблицы CSV в таблицы, а не двоичные объекты, поэтому я считаю, что это моя проблема.
{
"name": "BulkLoadPipeline",
"properties": {
"activities": [
{
"name": "GetFileNames",
"type": "GetMetadata",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"dataset": {
"referenceName": "DelimitedText1",
"type": "DatasetReference",
"parameters": {
"fileName": "@item()"
}
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobStorageReadSetting"
},
"formatSettings": {
"type": "DelimitedTextReadSetting"
}
}
},
{
"name": "CopyDataRunDeltaCheck",
"type": "ForEach",
"dependsOn": [
{
"activity": "BuildList",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@variables('fileList')",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "WriteToTables",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobStorageReadSetting",
"wildcardFileName": "*.*"
},
"formatSettings": {
"type": "DelimitedTextReadSetting"
}
},
"sink": {
"type": "AzureSqlSink"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"mappings": [
{
"source": {
"name": "myID",
"type": "String"
},
"sink": {
"name": "myID",
"type": "String"
}
},
{
"source": {
"name": "Col1",
"type": "String"
},
"sink": {
"name": "Col1",
"type": "String"
}
},
{
"source": {
"name": "Col2",
"type": "String"
},
"sink": {
"name": "Col2",
"type": "String"
}
},
{
"source": {
"name": "Col3",
"type": "String"
},
"sink": {
"name": "Col3",
"type": "String"
}
},
{
"source": {
"name": "Col4",
"type": "String"
},
"sink": {
"name": "Col4",
"type": "String"
}
},
{
"source": {
"name": "DW Date Created",
"type": "String"
},
"sink": {
"name": "DW_Date_Created",
"type": "String"
}
},
{
"source": {
"name": "DW Date Updated",
"type": "String"
},
"sink": {
"name": "DW_Date_Updated",
"type": "String"
}
}
]
}
},
"inputs": [
{
"referenceName": "DelimitedText1",
"type": "DatasetReference",
"parameters": {
"fileName": "@item()"
}
}
],
"outputs": [
{
"referenceName": "myTable",
"type": "DatasetReference"
}
]
},
{
"name": "CheckDeltas",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "WriteToTables",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"storedProcedureName": "[TL].[uspMyCheck]"
},
"linkedServiceName": {
"referenceName": "myService",
"type": "LinkedServiceReference"
}
}
]
}
},
{
"name": "BuildList",
"type": "ForEach",
"dependsOn": [
{
"activity": "GetFileNames",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('GetFileNames').output.childItems",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Create list from variables",
"type": "AppendVariable",
"typeProperties": {
"variableName": "fileList",
"value": "@item().name"
}
}
]
}
}
],
"variables": {
"fileList": {
"type": "Array"
}
}
}
}
Экран «Сведения» в выводе пиплелина показывает конвейерные циклы для количества элементов в BLOB-объекте, но каждый раз, когда Копирование данных и хранимая процедура запускаются для каждого файла в списке одновременно, в отличие отпо одному за раз.
Мне кажется, я близок к ответу, но мне не хватает одной важной части.Любая помощь или предложения приветствуются.