Попытка записать поток данных фабрики данных Azure, который будет обрабатывать два одинаковых версионных файла CSV. Файл версии 1 имеет 48 столбцов. Файл версии 2 содержит 50 столбцов - те же 48 столбцов, что и версия 1, но в конце добавлены 2 дополнительных столбца. Я хотел бы создать целевой файл паркета, который содержит все 50 столбцов для загрузки в мой SQLDW через polybase. Исторически у нас было более 6 тысяч файлов в одном источнике BLOB-объектов, и не было простого способа определить файлы с 48 столбцами по сравнению с 50 столбцами. Ниже приведено наиболее близкое решение, которое я нашел.
- Источник CSV с разрешенным дрейфом схемы. В наборе данных CSV не определена схема
- MapDrifted производные столбцы - то есть toString (byName ('Manufacturer')) все 50 столбцов
- Sink - набор данных представляет собой паркет со схемой, определенной файлом шаблона паркета который содержит все 50 столбцов. Раздел Sink устанавливается по имени исходного файла. Каждый входящий файл будет иметь паркетный файл, сгенерированный на выходе.
Это решение работает с набором из двух тестовых файлов. Один с 48 столбцами и один с 50 столбцами. Два паркетных файла созданы с 50 столбцами. Один файл заполняется до 48-го столбца, другой файл заполняется всеми 50 столбцами. Если я добавлю больше исходных файлов с 48 столбцами для теста. Файл с 50 столбцами теряет последние два столбца данных и заканчивается только 48 столбцами? Я думал, что это будет общая проблема, которую может решить ADF. т.е. версия файла меняется со временем. Какие-либо предложения? Ниже приведен сценарий моего ADF
source(allowSchemaDrift: true,
validateSchema: false,
rowUrlColumn: 'sourcefilename',
inferDriftedColumnTypes: true,
multiLineRow: true,
wildcardPaths:['avail/archive_csv2/*.csv']) ~> SRCAvailCSV
SRCAvailCSV derive(Manufacturer = toString(byName('Manufacturer')),
SKU = toString(byName('SKU')),
{Partner Name} = toString(byName('Partner Name')),
{Partner Part Number} = toString(byName('Partner Part Number')),
{Search Date} = toString(byName('Search Date')),
{Search Result Description} = toString(byName('Search Result Description')),
{1st Line Description} = toString(byName('1st Line Description')),
{2nd Line Description} = toString(byName('2nd Line Description')),
{Product Category} = toString(byName('Product Category')),
{Product Category 1} = toString(byName('Product Category 1')),
{Product Category 2} = toString(byName('Product Category 2')),
{Product Category 3} = toString(byName('Product Category 3')),
{Product Category 4} = toString(byName('Product Category 4')),
{UNSPSC Code} = toString(byName('UNSPSC Code')),
Pricing = toString(byName('Pricing')),
Currency = toString(byName('Currency')),
{Availability Qty} = toString(byName('Availability Qty')),
{Availability Status} = toString(byName('Availability Status')),
{Average Rating} = toString(byName('Average Rating')),
{Total Reviews} = toString(byName('Total Reviews')),
Brand = toString(byName('Brand')),
Model = toString(byName('Model')),
{Product Line} = toString(byName('Product Line')),
{Partner Site} = toString(byName('Partner Site')),
{Product URL} = toString(byName('Product URL')),
Warranty = toString(byName('Warranty')),
{Product Length} = toString(byName('Product Length')),
{Product Width} = toString(byName('Product Width')),
{Product Height} = toString(byName('Product Height')),
{Product Depth} = toString(byName('Product Depth')),
{Product Weight} = toString(byName('Product Weight')),
{Fullfilling Partner} = toString(byName('Fullfilling Partner')),
{Date First Available} = toString(byName('Date First Available')),
{Frequently Bought Together 1} = toString(byName('Frequently Bought Together 1')),
{Frequently Bought Together 1 Part Number} = toString(byName('Frequently Bought Together 1 Part Number')),
{Frequently Bought Together 2} = toString(byName('Frequently Bought Together 2')),
{Frequently Bought Together 2 Part Number} = toString(byName('Frequently Bought Together 2 Part Number')),
{Frequently Bought Together 3} = toString(byName('Frequently Bought Together 3')),
{Frequently Bought Together 3 Part Number} = toString(byName('Frequently Bought Together 3 Part Number')),
{Frequently Bought Together 4} = toString(byName('Frequently Bought Together 4')),
{Frequently Bought Together 4 Part Number} = toString(byName('Frequently Bought Together 4 Part Number')),
{From the Manufacturer} = toString(byName('From the Manufacturer')),
{Bestesellers Rank 1} = toString(byName('Bestesellers Rank 1')),
{Bestsellers Rank 2} = toString(byName('Bestsellers Rank 2')),
{Bestsellers Rank 3} = toString(byName('Bestsellers Rank 3')),
{Bestsellers Rank 4} = toString(byName('Bestsellers Rank 4')),
Endpoint = toString(byName('Endpoint')),
{Related StarTech.com SKU} = toString(byName('Related StarTech.com SKU')),
{Search SKU} = toString(byName('Search SKU')),
{Search Manufacturer} = toString(byName('Search Manufacturer')),
sourcefilename = sourcefilename) ~> MapDrifted1
MapDrifted1 sink(input(
FileName as string,
Manufacturer as string,
SKU as string,
PartnerName as string,
PartnerPartNumber as string,
SearchDate as string,
SearchResultDescription as string,
{1stLineDescription} as string,
{2ndLineDescription} as string,
ProductCategory as string,
ProductCategory1 as string,
ProductCategory2 as string,
ProductCategory3 as string,
ProductCategory4 as string,
UNSPSCCode as string,
Pricing as string,
Currency as string,
AvailabilityQty as string,
AvailabilityStatus as string,
AverageRating as string,
TotalReviews as string,
Brand as string,
Model as string,
ProductLine as string,
PartnerSite as string,
ProductURL as string,
Warranty as string,
ProductLength as string,
ProductWidth as string,
ProductHeight as string,
ProductDepth as string,
ProductWeight as string,
FullfillingPartner as string,
DateFirstAvailable as string,
FrequentlyBoughtTogether1 as string,
FrequentlyBoughtTogether1PartNumber as string,
FrequentlyBoughtTogether2 as string,
FrequentlyBoughtTogether2PartNumber as string,
FrequentlyBoughtTogether3 as string,
FrequentlyBoughtTogether3PartNumber as string,
FrequentlyBoughtTogether4 as string,
FrequentlyBoughtTogether4PartNumber as string,
FromtheManufacturer as string,
BestesellersRank1 as string,
BestsellersRank2 as string,
BestsellersRank3 as string,
BestsellersRank4 as string,
Endpoint as string,
RelatedStarTechcomSKU as string,
SearchSKU as string,
SearchManufacturer as string
),
allowSchemaDrift: false,
validateSchema: false,
format: 'parquet',
rowUrlColumn:'sourcefilename',
mapColumn(
FileName = sourcefilename,
Manufacturer,
SKU,
PartnerName = {Partner Name},
PartnerPartNumber = {Partner Part Number},
SearchDate = {Search Date},
SearchResultDescription = {Search Result Description},
{1stLineDescription} = {1st Line Description},
{2ndLineDescription} = {2nd Line Description},
ProductCategory = {Product Category},
ProductCategory1 = {Product Category 1},
ProductCategory2 = {Product Category 2},
ProductCategory3 = {Product Category 3},
ProductCategory4 = {Product Category 4},
UNSPSCCode = {UNSPSC Code},
Pricing,
Currency,
AvailabilityQty = {Availability Qty},
AvailabilityStatus = {Availability Status},
AverageRating = {Average Rating},
TotalReviews = {Total Reviews},
Brand,
Model,
ProductLine = {Product Line},
PartnerSite = {Partner Site},
ProductURL = {Product URL},
Warranty,
ProductLength = {Product Length},
ProductWidth = {Product Width},
ProductHeight = {Product Height},
ProductDepth = {Product Depth},
ProductWeight = {Product Weight},
FullfillingPartner = {Fullfilling Partner},
DateFirstAvailable = {Date First Available},
FrequentlyBoughtTogether1 = {Frequently Bought Together 1},
FrequentlyBoughtTogether1PartNumber = {Frequently Bought Together 1 Part Number},
FrequentlyBoughtTogether2 = {Frequently Bought Together 2},
FrequentlyBoughtTogether2PartNumber = {Frequently Bought Together 2 Part Number},
FrequentlyBoughtTogether3 = {Frequently Bought Together 3},
FrequentlyBoughtTogether3PartNumber = {Frequently Bought Together 3 Part Number},
FrequentlyBoughtTogether4 = {Frequently Bought Together 4},
FrequentlyBoughtTogether4PartNumber = {Frequently Bought Together 4 Part Number},
FromtheManufacturer = {From the Manufacturer},
BestesellersRank1 = {Bestesellers Rank 1},
BestsellersRank2 = {Bestsellers Rank 2},
BestsellersRank3 = {Bestsellers Rank 3},
BestsellersRank4 = {Bestsellers Rank 4},
Endpoint,
RelatedStarTechcomSKU = {Related StarTech.com SKU},
SearchSKU = {Search SKU},
SearchManufacturer = {Search Manufacturer}
)) ~> sink1