У меня проблема с производительностью Spark в AWS EMR, при использовании Windows для агрегирования данных в наборе данных 6M.
+--------------------+-------------------+---------------+--------------------+-------------------+----------+--------------+---------+
| employerName|employerLegalStatus|employerAddress| _metaData| naicCode| _jobTitle|_worksiteState|_uniqueId|
+--------------------+-------------------+---------------+--------------------+-------------------+----------+--------------+---------+
| Advanced Technology| Inc.| [state -> NJ]|[/, test.xlsx, TE...|[1234, 1, code 1,,]|Job Title1| ST1| 1|
| Advanced Technology| Inc.| [state -> NJ]|[/, test.xlsx, TE...|[2234, 1, code 2,,]|Job Title1| ST2| 2|
| Advanced Technology| Inc.| [state -> NJ]|[/, test.xlsx, TE...|[1234, 1, code 1,,]|Job Title2| ST1| 3|
| Advanced Technology| Inc.| [state ->]|[/, test.xlsx, TE...|[1234, 1, code 1,,]|Job Title1| ST3| 4|
Я пытаюсь дедуплицировать данные путем объединения «похожих» записей.
В основной итерации я нахожу все соответствующие записи, которые соответствуют друг другу, и указываю каждую из них на самую последнюю запись, добавляя поле _parentId
(которое является _uniqueId
самой последней записи).
+--------------------+-------------------+---------------+--------------------+-------------------+----------+--------------+---------+---------------------+--------------------+-----------------+-------------+------------------------+----------------+------------------------+-----------------+----------------+-----------+---------------------+---------------------------+-------------+------+----------+------------------+---------+
| employerName|employerLegalStatus|employerAddress| _metaData| naicCode| _jobTitle|_worksiteState|_uniqueId|__expectedCombination| _id|_reviewedCategory|_toBeReviewed|petitionCountPerVisaType|organizationFlag|potentialVisaSponsorship|otherEmployerName|requestTypeCount|primaryCrop|natureOfTemporaryNeed|petitionCountPerCitizenship|employerPhone|lawyer|primarySub|totalPetitionCount|_parentId|
+--------------------+-------------------+---------------+--------------------+-------------------+----------+--------------+---------+---------------------+--------------------+-----------------+-------------+------------------------+----------------+------------------------+-----------------+----------------+-----------+---------------------+---------------------------+-------------+------+----------+------------------+---------+
| Advanced Technology| Inc.| [state -> NJ]|[/, test.xlsx, TE...|[1234, 1, code 1,,]|Job Title1| ST1| 1| [,]|[5bbce91bdec23c60...| null| false| 1| null| null| null| 1| null| null| 1| null| null| null| 1| 1|
| Advanced Technology| Inc.| [state -> NJ]|[/, test.xlsx, TE...|[2234, 1, code 2,,]|Job Title1| ST2| 2| [1,]|[5bbce91bdec23c60...| null| false| 1| null| null| null| 1| null| null| 1| null| null| null| 1| 1|
| Advanced Technology| Inc.| [state -> NJ]|[/, test.xlsx, TE...|[1234, 1, code 1,,]|Job Title2| ST1| 3| [1,]|[5bbce91bdec23c60...| null| false| 1| null| null| null| 1| null| null| 1| null| null| null| 1| 1|
| Imerys Clays| Inc.| [state -> NJ]|[/, test.xlsx, TE...|[1234, 1, code 1,,]|Job Title2| ST1| 9| [,]|[5bbce91bdec23c60...| null| false| 1| null| null| null| 1| null| null| 1| null| null| null| 1| 9|
| Imerys Clays| Inc.| [state -> NJ]|[/, test.xlsx, TE...|[3234, 1, code 3,,]|Job Title1| ST3| 10| [9,]|[5bbce91bdec23c60...| null| false| 1| null| null| null| 1| null| null| 1| null| null| null| 1| 9|
| Imerys Clays| Inc.| [state -> NJ]|[/, test.xlsx, TE...|[3234, 1, code 3,,]|Job Title2| ST1| 8| [9,]|[5bbce91bdec23c60...| null| false| 1| null| null| null| 1| null| null| 1| null| null| null| 1| 9|
Далее я хочу сгруппировать все записи вместе с одним и тем же _parentId
, используя windows
, потому что я хочу сохранить родительскую запись «как есть» и объединять только определенные поля в виде списков.Я мог бы использовать groupBy
, но тогда я потерял бы информацию о заказе и мне пришлось бы распаковать сгруппированный список записей в новую запись.
val mergeWindow = Window
.partitionBy(MetaData.Field.parentId.col)
val cols = originalFields.toList.map(col)
combined
.select(
collect_list($"_metaData.path").over(mergeWindow).as("__metaDataPaths") ::
collect_list('_id).over(mergeWindow).as("__ids") ::
collect_list('_uniqueId).over(mergeWindow).as("__uniqueIds") ::
collect_list('naicCode).over(mergeWindow).as("__naicCodes") ::
collect_list('_jobTitle).over(mergeWindow).as("jobTitles") ::
collect_list('_worksiteState).over(mergeWindow).as("worksiteStates") ::
collect_list('employerAddress).over(mergeWindow).as("__otherEmployerAddress") ::
collect_list('requestTypeCount).over(mergeWindow).as("__requestTypeCount") ::
collect_list('petitionCountPerVisaType).over(mergeWindow).as("__petitionCountPerVisaType") ::
collect_list('petitionCountPerCitizenship).over(mergeWindow).as("__petitionCountPerCitizenship") ::
count('totalPetitionCount).over(mergeWindow).as("totalPetitionCount") ::
cols: _*
)
.filter('_uniqueId === '_parentId)
Так что некоторые поля я оставляю как естьнекоторые из них я собираю в виде списка (для дальнейшей обработки)
План выглядит следующим образом:
== Physical Plan ==
*(3) Project [__metaDataPaths#4962, __ids#4964, __uniqueIds#4966, __naicCodes#4968, jobTitles#4970, worksiteStates#4972, __otherEmployerAddress#4974, __requestTypeCount#4976, __petitionCountPerVisaType#4978, __petitionCountPerCitizenship#4980, totalPetitionCount#4983L, naicCode#4495, _jobTitle#4496, __expectedCombination#4499, _id#4500, _worksiteState#4497, _parentId#4515L, _metaData#4494, _toBeReviewed#4502, _uniqueId#4498L, employerName#4491, employerAddress#4493, employerLegalStatus#4492, potentialVisaSponsorship#4944, ... 8 more fields]
+- *(3) Filter (isnotnull(_uniqueId#4498L) && (_uniqueId#4498L = _parentId#4515L))
+- Window [collect_list(_metaData#4494.originalFilePath, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __metaDataPaths#4962, collect_list(_id#4500, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __ids#4964, collect_list(_uniqueId#4498L, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __uniqueIds#4966, collect_list(naicCode#4495, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __naicCodes#4968, collect_list(_jobTitle#4496, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS jobTitles#4970, collect_list(_worksiteState#4497, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS worksiteStates#4972, collect_list(employerAddress#4493, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __otherEmployerAddress#4974, collect_list(requestTypeCount#4507, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __requestTypeCount#4976, collect_list(petitionCountPerVisaType#4503, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __petitionCountPerVisaType#4978, collect_list(petitionCountPerCitizenship#4510, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __petitionCountPerCitizenship#4980, count(totalPetitionCount#4514) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS totalPetitionCount#4983L, collect_list(potentialVisaSponsorship#4505, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS potentialVisaSponsorship#4944, collect_list(_reviewedCategory#4501, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _reviewedCategory#4946, collect_list(otherEmployerName#4506, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS otherEmployerName#4958, collect_list(employerPhone#4511, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS employerPhone#4956, collect_list(lawyer#4512, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS lawyer#4960, collect_list(organizationFlag#4504, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS organizationFlag#4952, collect_list(primarySub#4513, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS primarySub#4948, collect_list(primaryCrop#4508, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS primaryCrop#4950, collect_list(natureOfTemporaryNeed#4509, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS natureOfTemporaryNeed#4954], [_parentId#4515L]
+- *(2) Sort [_parentId#4515L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(_parentId#4515L, 1000)
+- *(1) Project [naicCode#4495, _jobTitle#4496, __expectedCombination#4499, _id#4500, _worksiteState#4497, _parentId#4515L, _metaData#4494, _toBeReviewed#4502, _uniqueId#4498L, employerName#4491, employerAddress#4493, employerLegalStatus#4492, requestTypeCount#4507, petitionCountPerVisaType#4503, petitionCountPerCitizenship#4510, totalPetitionCount#4514, potentialVisaSponsorship#4505, _reviewedCategory#4501, otherEmployerName#4506, employerPhone#4511, lawyer#4512, organizationFlag#4504, primarySub#4513, primaryCrop#4508, natureOfTemporaryNeed#4509]
+- *(1) Filter isnotnull(_parentId#4515L)
+- *(1) FileScan parquet default.combinationrule4[employerName#4491,employerLegalStatus#4492,employerAddress#4493,_metaData#4494,naicCode#4495,_jobTitle#4496,_worksiteState#4497,_uniqueId#4498L,__expectedCombination#4499,_id#4500,_reviewedCategory#4501,_toBeReviewed#4502,petitionCountPerVisaType#4503,organizationFlag#4504,potentialVisaSponsorship#4505,otherEmployerName#4506,requestTypeCount#4507,primaryCrop#4508,natureOfTemporaryNeed#4509,petitionCountPerCitizenship#4510,employerPhone#4511,lawyer#4512,primarySub#4513,totalPetitionCount#4514,_parentId#4515L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/Users/tlous/Development/Scala/importer-pipeline/spark-warehouse/combinati..., PartitionFilters: [], PushedFilters: [IsNotNull(_parentId)], ReadSchema: struct<employerName:string,employerLegalStatus:string,employerAddress:map<string,string>,_metaDat...
В конце концов, он должен получить 1 млн записей.Однако есть перекос.Некоторые записи будут содержать 10 000+ совпадений, некоторые 1 или ни одной.
Работа выполняется для пары 1000 записей с небольшим перекосом, но в 16-ядерном кластере EMR с 5 узлами это займет более 5 часов.Так что не хорошо.
Как я могу оптимизировать этот запрос?Должен ли я использовать groupBy и, если да, как выбрать самую последнюю запись из всех сгруппированных полей.Есть ли более разумный способ?