Azure поток данных фабрики данных CSV-схема дрейфует к паркету. c столбцы удаления назначения Является ли это возможным? - PullRequest
0 голосов
/ 27 января 2020

Попытка записать поток данных фабрики данных Azure, который будет обрабатывать два одинаковых версионных файла CSV. Файл версии 1 имеет 48 столбцов. Файл версии 2 содержит 50 столбцов - те же 48 столбцов, что и версия 1, но в конце добавлены 2 дополнительных столбца. Я хотел бы создать целевой файл паркета, который содержит все 50 столбцов для загрузки в мой SQLDW через polybase. Исторически у нас было более 6 тысяч файлов в одном источнике BLOB-объектов, и не было простого способа определить файлы с 48 столбцами по сравнению с 50 столбцами. Ниже приведено наиболее близкое решение, которое я нашел.

  1. Источник CSV с разрешенным дрейфом схемы. В наборе данных CSV не определена схема
  2. MapDrifted производные столбцы - то есть toString (byName ('Manufacturer')) все 50 столбцов
  3. Sink - набор данных представляет собой паркет со схемой, определенной файлом шаблона паркета который содержит все 50 столбцов. Раздел Sink устанавливается по имени исходного файла. Каждый входящий файл будет иметь паркетный файл, сгенерированный на выходе.

Это решение работает с набором из двух тестовых файлов. Один с 48 столбцами и один с 50 столбцами. Два паркетных файла созданы с 50 столбцами. Один файл заполняется до 48-го столбца, другой файл заполняется всеми 50 столбцами. Если я добавлю больше исходных файлов с 48 столбцами для теста. Файл с 50 столбцами теряет последние два столбца данных и заканчивается только 48 столбцами? Я думал, что это будет общая проблема, которую может решить ADF. т.е. версия файла меняется со временем. Какие-либо предложения? Ниже приведен сценарий моего ADF

source(allowSchemaDrift: true,
    validateSchema: false,
    rowUrlColumn: 'sourcefilename',
    inferDriftedColumnTypes: true,
    multiLineRow: true,
    wildcardPaths:['avail/archive_csv2/*.csv']) ~> SRCAvailCSV
SRCAvailCSV derive(Manufacturer = toString(byName('Manufacturer')),
        SKU = toString(byName('SKU')),
        {Partner Name} = toString(byName('Partner Name')),
        {Partner Part Number} = toString(byName('Partner Part Number')),
        {Search Date} = toString(byName('Search Date')),
        {Search Result Description} = toString(byName('Search Result Description')),
        {1st Line Description} = toString(byName('1st Line Description')),
        {2nd Line Description} = toString(byName('2nd Line Description')),
        {Product Category} = toString(byName('Product Category')),
        {Product Category 1} = toString(byName('Product Category 1')),
        {Product Category 2} = toString(byName('Product Category 2')),
        {Product Category 3} = toString(byName('Product Category 3')),
        {Product Category 4} = toString(byName('Product Category 4')),
        {UNSPSC Code} = toString(byName('UNSPSC Code')),
        Pricing = toString(byName('Pricing')),
        Currency = toString(byName('Currency')),
        {Availability Qty} = toString(byName('Availability Qty')),
        {Availability Status} = toString(byName('Availability Status')),
        {Average Rating} = toString(byName('Average Rating')),
        {Total Reviews} = toString(byName('Total Reviews')),
        Brand = toString(byName('Brand')),
        Model = toString(byName('Model')),
        {Product Line} = toString(byName('Product Line')),
        {Partner Site} = toString(byName('Partner Site')),
        {Product URL} = toString(byName('Product URL')),
        Warranty = toString(byName('Warranty')),
        {Product Length} = toString(byName('Product Length')),
        {Product Width} = toString(byName('Product Width')),
        {Product Height} = toString(byName('Product Height')),
        {Product Depth} = toString(byName('Product Depth')),
        {Product Weight} = toString(byName('Product Weight')),
        {Fullfilling Partner} = toString(byName('Fullfilling Partner')),
        {Date First Available} = toString(byName('Date First Available')),
        {Frequently Bought Together 1} = toString(byName('Frequently Bought Together 1')),
        {Frequently Bought Together 1 Part Number} = toString(byName('Frequently Bought Together 1 Part Number')),
        {Frequently Bought Together 2} = toString(byName('Frequently Bought Together 2')),
        {Frequently Bought Together 2 Part Number} = toString(byName('Frequently Bought Together 2 Part Number')),
        {Frequently Bought Together 3} = toString(byName('Frequently Bought Together 3')),
        {Frequently Bought Together 3 Part Number} = toString(byName('Frequently Bought Together 3 Part Number')),
        {Frequently Bought Together 4} = toString(byName('Frequently Bought Together 4')),
        {Frequently Bought Together 4 Part Number} = toString(byName('Frequently Bought Together 4 Part Number')),
        {From the Manufacturer} = toString(byName('From the Manufacturer')),
        {Bestesellers Rank 1} = toString(byName('Bestesellers Rank 1')),
        {Bestsellers Rank 2} = toString(byName('Bestsellers Rank 2')),
        {Bestsellers Rank 3} = toString(byName('Bestsellers Rank 3')),
        {Bestsellers Rank 4} = toString(byName('Bestsellers Rank 4')),
        Endpoint = toString(byName('Endpoint')),
        {Related StarTech.com SKU} = toString(byName('Related StarTech.com SKU')),
        {Search SKU} = toString(byName('Search SKU')),
        {Search Manufacturer} = toString(byName('Search Manufacturer')),
        sourcefilename = sourcefilename) ~> MapDrifted1
MapDrifted1 sink(input(
        FileName as string,
        Manufacturer as string,
        SKU as string,
        PartnerName as string,
        PartnerPartNumber as string,
        SearchDate as string,
        SearchResultDescription as string,
        {1stLineDescription} as string,
        {2ndLineDescription} as string,
        ProductCategory as string,
        ProductCategory1 as string,
        ProductCategory2 as string,
        ProductCategory3 as string,
        ProductCategory4 as string,
        UNSPSCCode as string,
        Pricing as string,
        Currency as string,
        AvailabilityQty as string,
        AvailabilityStatus as string,
        AverageRating as string,
        TotalReviews as string,
        Brand as string,
        Model as string,
        ProductLine as string,
        PartnerSite as string,
        ProductURL as string,
        Warranty as string,
        ProductLength as string,
        ProductWidth as string,
        ProductHeight as string,
        ProductDepth as string,
        ProductWeight as string,
        FullfillingPartner as string,
        DateFirstAvailable as string,
        FrequentlyBoughtTogether1 as string,
        FrequentlyBoughtTogether1PartNumber as string,
        FrequentlyBoughtTogether2 as string,
        FrequentlyBoughtTogether2PartNumber as string,
        FrequentlyBoughtTogether3 as string,
        FrequentlyBoughtTogether3PartNumber as string,
        FrequentlyBoughtTogether4 as string,
        FrequentlyBoughtTogether4PartNumber as string,
        FromtheManufacturer as string,
        BestesellersRank1 as string,
        BestsellersRank2 as string,
        BestsellersRank3 as string,
        BestsellersRank4 as string,
        Endpoint as string,
        RelatedStarTechcomSKU as string,
        SearchSKU as string,
        SearchManufacturer as string
    ),
    allowSchemaDrift: false,
    validateSchema: false,
    format: 'parquet',
    rowUrlColumn:'sourcefilename',
    mapColumn(
        FileName = sourcefilename,
        Manufacturer,
        SKU,
        PartnerName = {Partner Name},
        PartnerPartNumber = {Partner Part Number},
        SearchDate = {Search Date},
        SearchResultDescription = {Search Result Description},
        {1stLineDescription} = {1st Line Description},
        {2ndLineDescription} = {2nd Line Description},
        ProductCategory = {Product Category},
        ProductCategory1 = {Product Category 1},
        ProductCategory2 = {Product Category 2},
        ProductCategory3 = {Product Category 3},
        ProductCategory4 = {Product Category 4},
        UNSPSCCode = {UNSPSC Code},
        Pricing,
        Currency,
        AvailabilityQty = {Availability Qty},
        AvailabilityStatus = {Availability Status},
        AverageRating = {Average Rating},
        TotalReviews = {Total Reviews},
        Brand,
        Model,
        ProductLine = {Product Line},
        PartnerSite = {Partner Site},
        ProductURL = {Product URL},
        Warranty,
        ProductLength = {Product Length},
        ProductWidth = {Product Width},
        ProductHeight = {Product Height},
        ProductDepth = {Product Depth},
        ProductWeight = {Product Weight},
        FullfillingPartner = {Fullfilling Partner},
        DateFirstAvailable = {Date First Available},
        FrequentlyBoughtTogether1 = {Frequently Bought Together 1},
        FrequentlyBoughtTogether1PartNumber = {Frequently Bought Together 1 Part Number},
        FrequentlyBoughtTogether2 = {Frequently Bought Together 2},
        FrequentlyBoughtTogether2PartNumber = {Frequently Bought Together 2 Part Number},
        FrequentlyBoughtTogether3 = {Frequently Bought Together 3},
        FrequentlyBoughtTogether3PartNumber = {Frequently Bought Together 3 Part Number},
        FrequentlyBoughtTogether4 = {Frequently Bought Together 4},
        FrequentlyBoughtTogether4PartNumber = {Frequently Bought Together 4 Part Number},
        FromtheManufacturer = {From the Manufacturer},
        BestesellersRank1 = {Bestesellers Rank 1},
        BestsellersRank2 = {Bestsellers Rank 2},
        BestsellersRank3 = {Bestsellers Rank 3},
        BestsellersRank4 = {Bestsellers Rank 4},
        Endpoint,
        RelatedStarTechcomSKU = {Related StarTech.com SKU},
        SearchSKU = {Search SKU},
        SearchManufacturer = {Search Manufacturer}
    )) ~> sink1

Ответы [ 2 ]

0 голосов
/ 04 февраля 2020

Я собрал очень быструю, очень простую демонстрацию: http://youtu.be/DhscTC6VwwI?hd=1.

Я добавил ее в нашу коллекцию видео потоков данных ADF, потому что это общий шаблон и рычаги дрейф схемы и гибкая обработка схемы.

Ключом к выполнению этой работы является построение выходной модели в потоке данных с производным столбцом. Держите ваши наборы данных без схемы.

Дайте мне знать, если у вас есть какие-либо вопросы.

0 голосов
/ 27 января 2020

Вы всегда wi sh выводите файл Parquet с той же схемой? Т.е. 50 столбцов, независимо от схемы входящего файла?

Если это так, то вы можете создать поток данных с "канонической моделью", которая определяет эту структуру из 50 столбцов.

Вы создаст определение целевой схемы с использованием производного столбца и отобразит там входящие исходные столбцы. Если у вас нет соответствующего столбца, вы можете просто установить значение NULL.

При использовании этого метода вам не нужно будет определять формат набора данных в приемнике. Вы можете просто использовать Auto Map с пустым набором данных и выводить файлы Parquet.

Схема вывода файла Parquet будет соответствовать вашей модели Derived Column, которая будет определять понятные псевдонимы, которые вы использовали в отображении Sink выше.

Вот видео, которое я сделал, чтобы помочь объяснить этот метод: https://www.youtube.com/watch?v=K5tgzLjEE9Q.

Надеюсь, это поможет.

...