Операция Azure DataFactory ForEach Copy не выполняет итерацию, а извлекает все файлы в BLOB-объектах. Зачем? - PullRequest
0 голосов
/ 21 мая 2019

У меня есть конвейер в DF2, который должен просматривать папку в BLOB-объекте и последовательно обрабатывать каждый из 145 файлов в таблице базы данных.После загрузки каждого файла в таблицу должна быть запущена хранимая процедура, которая проверит каждую запись и либо вставит ее, либо обновит существующую запись в основной таблице.

Глядя в онлайн, я чувствую, что пробовал каждую комбинацию активаций «Получить метаданные», «Для каждого», «LookUp» и «Присвоить переменную», но по какой-то причине мои данные копирования все равно выбираютсявсе файлы одновременно и работает 145 раз.

Недавно в Интернете нашел блог, за которым я следовал, чтобы использовать «Назначить переменную», так как он будет полезен для нескольких местоположений файлов, но он не работает для меня.Мне нужно читать файлы как таблицы CSV в таблицы, а не двоичные объекты, поэтому я считаю, что это моя проблема.

    {
        "name": "BulkLoadPipeline",
        "properties": {
            "activities": [
                {
                    "name": "GetFileNames",
                    "type": "GetMetadata",
                    "policy": {
                        "timeout": "7.00:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "typeProperties": {
                        "dataset": {
                            "referenceName": "DelimitedText1",
                            "type": "DatasetReference",
                            "parameters": {
                                "fileName": "@item()"
                            }
                        },
                        "fieldList": [
                            "childItems"
                        ],
                        "storeSettings": {
                            "type": "AzureBlobStorageReadSetting"
                        },
                        "formatSettings": {
                            "type": "DelimitedTextReadSetting"
                        }
                    }
                },
                {
                    "name": "CopyDataRunDeltaCheck",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "BuildList",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "typeProperties": {
                        "items": {
                            "value": "@variables('fileList')",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "WriteToTables",
                                "type": "Copy",
                                "policy": {
                                    "timeout": "7.00:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "typeProperties": {
                                    "source": {
                                        "type": "DelimitedTextSource",
                                        "storeSettings": {
                                            "type": "AzureBlobStorageReadSetting",
                                            "wildcardFileName": "*.*"
                                        },
                                        "formatSettings": {
                                            "type": "DelimitedTextReadSetting"
                                        }
                                    },
                                    "sink": {
                                        "type": "AzureSqlSink"
                                    },
                                    "enableStaging": false,
                                    "translator": {
                                        "type": "TabularTranslator",
                                        "mappings": [
                                            {
                                                "source": {
                                                    "name": "myID",
                                                    "type": "String"
                                                },
                                                "sink": {
                                                    "name": "myID",
                                                    "type": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "Col1",
                                                    "type": "String"
                                                },
                                                "sink": {
                                                    "name": "Col1",
                                                    "type": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "Col2",
                                                    "type": "String"
                                                },
                                                "sink": {
                                                    "name": "Col2",
                                                    "type": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "Col3",
                                                    "type": "String"
                                                },
                                                "sink": {
                                                    "name": "Col3",
                                                    "type": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "Col4",
                                                    "type": "String"
                                                },
                                                "sink": {
                                                    "name": "Col4",
                                                    "type": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "DW Date Created",
                                                    "type": "String"
                                                },
                                                "sink": {
                                                    "name": "DW_Date_Created",
                                                    "type": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "DW Date Updated",
                                                    "type": "String"
                                                },
                                                "sink": {
                                                    "name": "DW_Date_Updated",
                                                    "type": "String"
                                                }
                                            }
                                        ]
                                    }
                                },
                                "inputs": [
                                    {
                                        "referenceName": "DelimitedText1",
                                        "type": "DatasetReference",
                                        "parameters": {
                                            "fileName": "@item()"
                                        }
                                    }
                                ],
                                "outputs": [
                                    {
                                        "referenceName": "myTable",
                                        "type": "DatasetReference"
                                    }
                                ]
                            },
                            {
                                "name": "CheckDeltas",
                                "type": "SqlServerStoredProcedure",
                                "dependsOn": [
                                    {
                                        "activity": "WriteToTables",
                                        "dependencyConditions": [
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "policy": {
                                    "timeout": "7.00:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "typeProperties": {
                                    "storedProcedureName": "[TL].[uspMyCheck]"
                                },
                                "linkedServiceName": {
                                    "referenceName": "myService",
                                    "type": "LinkedServiceReference"
                                }
                            }
                        ]
                    }
                },
                {
                    "name": "BuildList",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "GetFileNames",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "typeProperties": {
                        "items": {
                            "value": "@activity('GetFileNames').output.childItems",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Create list from variables",
                                "type": "AppendVariable",
                                "typeProperties": {
                                    "variableName": "fileList",
                                    "value": "@item().name"
                                }
                            }
                        ]
                    }
                }
            ],
            "variables": {
                "fileList": {
                    "type": "Array"
                }
            }
        }
    }

Экран «Сведения» в выводе пиплелина показывает конвейерные циклы для количества элементов в BLOB-объекте, но каждый раз, когда Копирование данных и хранимая процедура запускаются для каждого файла в списке одновременно, в отличие отпо одному за раз.

Мне кажется, я близок к ответу, но мне не хватает одной важной части.Любая помощь или предложения приветствуются.

1 Ответ

0 голосов
/ 22 мая 2019

Ваша полезная нагрузка неверна.

  1. GetMetadata actvitiy не должен использовать тот же набор данных с копией деятельности.
  2. Деятельность GetMetadata должна ссылаться на набор данных с папкой, папка содержит все файлы, с которыми вы хотите иметь дело. но ваш набор данных имеет параметр 'filename'.
  3. использовать выходные данные действия getMetadata в качестве входных данных для действия forEach. childItems
...