Проблема производительности Spark Window - PullRequest
0 голосов
/ 09 октября 2018

У меня проблема с производительностью Spark в AWS EMR, при использовании Windows для агрегирования данных в наборе данных 6M.

+--------------------+-------------------+---------------+--------------------+-------------------+----------+--------------+---------+
|        employerName|employerLegalStatus|employerAddress|           _metaData|           naicCode| _jobTitle|_worksiteState|_uniqueId|
+--------------------+-------------------+---------------+--------------------+-------------------+----------+--------------+---------+
| Advanced Technology|               Inc.|  [state -> NJ]|[/, test.xlsx, TE...|[1234, 1, code 1,,]|Job Title1|           ST1|        1|
| Advanced Technology|               Inc.|  [state -> NJ]|[/, test.xlsx, TE...|[2234, 1, code 2,,]|Job Title1|           ST2|        2|
| Advanced Technology|               Inc.|  [state -> NJ]|[/, test.xlsx, TE...|[1234, 1, code 1,,]|Job Title2|           ST1|        3|
| Advanced Technology|               Inc.|     [state ->]|[/, test.xlsx, TE...|[1234, 1, code 1,,]|Job Title1|           ST3|        4|

Я пытаюсь дедуплицировать данные путем объединения «похожих» записей.

В основной итерации я нахожу все соответствующие записи, которые соответствуют друг другу, и указываю каждую из них на самую последнюю запись, добавляя поле _parentId (которое является _uniqueId самой последней записи).

+--------------------+-------------------+---------------+--------------------+-------------------+----------+--------------+---------+---------------------+--------------------+-----------------+-------------+------------------------+----------------+------------------------+-----------------+----------------+-----------+---------------------+---------------------------+-------------+------+----------+------------------+---------+
|        employerName|employerLegalStatus|employerAddress|           _metaData|           naicCode| _jobTitle|_worksiteState|_uniqueId|__expectedCombination|                 _id|_reviewedCategory|_toBeReviewed|petitionCountPerVisaType|organizationFlag|potentialVisaSponsorship|otherEmployerName|requestTypeCount|primaryCrop|natureOfTemporaryNeed|petitionCountPerCitizenship|employerPhone|lawyer|primarySub|totalPetitionCount|_parentId|
+--------------------+-------------------+---------------+--------------------+-------------------+----------+--------------+---------+---------------------+--------------------+-----------------+-------------+------------------------+----------------+------------------------+-----------------+----------------+-----------+---------------------+---------------------------+-------------+------+----------+------------------+---------+
| Advanced Technology|               Inc.|  [state -> NJ]|[/, test.xlsx, TE...|[1234, 1, code 1,,]|Job Title1|           ST1|        1|                  [,]|[5bbce91bdec23c60...|             null|        false|                       1|            null|                    null|             null|               1|       null|                 null|                          1|         null|  null|      null|                 1|        1|
| Advanced Technology|               Inc.|  [state -> NJ]|[/, test.xlsx, TE...|[2234, 1, code 2,,]|Job Title1|           ST2|        2|                 [1,]|[5bbce91bdec23c60...|             null|        false|                       1|            null|                    null|             null|               1|       null|                 null|                          1|         null|  null|      null|                 1|        1|
| Advanced Technology|               Inc.|  [state -> NJ]|[/, test.xlsx, TE...|[1234, 1, code 1,,]|Job Title2|           ST1|        3|                 [1,]|[5bbce91bdec23c60...|             null|        false|                       1|            null|                    null|             null|               1|       null|                 null|                          1|         null|  null|      null|                 1|        1|
|        Imerys Clays|               Inc.|  [state -> NJ]|[/, test.xlsx, TE...|[1234, 1, code 1,,]|Job Title2|           ST1|        9|                  [,]|[5bbce91bdec23c60...|             null|        false|                       1|            null|                    null|             null|               1|       null|                 null|                          1|         null|  null|      null|                 1|        9|
|        Imerys Clays|               Inc.|  [state -> NJ]|[/, test.xlsx, TE...|[3234, 1, code 3,,]|Job Title1|           ST3|       10|                 [9,]|[5bbce91bdec23c60...|             null|        false|                       1|            null|                    null|             null|               1|       null|                 null|                          1|         null|  null|      null|                 1|        9|
|        Imerys Clays|               Inc.|  [state -> NJ]|[/, test.xlsx, TE...|[3234, 1, code 3,,]|Job Title2|           ST1|        8|                 [9,]|[5bbce91bdec23c60...|             null|        false|                       1|            null|                    null|             null|               1|       null|                 null|                          1|         null|  null|      null|                 1|        9|

Далее я хочу сгруппировать все записи вместе с одним и тем же _parentId, используя windows, потому что я хочу сохранить родительскую запись «как есть» и объединять только определенные поля в виде списков.Я мог бы использовать groupBy, но тогда я потерял бы информацию о заказе и мне пришлось бы распаковать сгруппированный список записей в новую запись.

val mergeWindow = Window
  .partitionBy(MetaData.Field.parentId.col)

val cols = originalFields.toList.map(col)

combined
      .select(
        collect_list($"_metaData.path").over(mergeWindow).as("__metaDataPaths") ::
          collect_list('_id).over(mergeWindow).as("__ids") ::
          collect_list('_uniqueId).over(mergeWindow).as("__uniqueIds") ::
          collect_list('naicCode).over(mergeWindow).as("__naicCodes") ::
          collect_list('_jobTitle).over(mergeWindow).as("jobTitles") ::
          collect_list('_worksiteState).over(mergeWindow).as("worksiteStates") ::
          collect_list('employerAddress).over(mergeWindow).as("__otherEmployerAddress") ::
          collect_list('requestTypeCount).over(mergeWindow).as("__requestTypeCount") ::
          collect_list('petitionCountPerVisaType).over(mergeWindow).as("__petitionCountPerVisaType") ::
          collect_list('petitionCountPerCitizenship).over(mergeWindow).as("__petitionCountPerCitizenship") ::
          count('totalPetitionCount).over(mergeWindow).as("totalPetitionCount") ::
          cols: _*
      )
      .filter('_uniqueId === '_parentId)

Так что некоторые поля я оставляю как естьнекоторые из них я собираю в виде списка (для дальнейшей обработки)

План выглядит следующим образом:

== Physical Plan ==
*(3) Project [__metaDataPaths#4962, __ids#4964, __uniqueIds#4966, __naicCodes#4968, jobTitles#4970, worksiteStates#4972, __otherEmployerAddress#4974, __requestTypeCount#4976, __petitionCountPerVisaType#4978, __petitionCountPerCitizenship#4980, totalPetitionCount#4983L, naicCode#4495, _jobTitle#4496, __expectedCombination#4499, _id#4500, _worksiteState#4497, _parentId#4515L, _metaData#4494, _toBeReviewed#4502, _uniqueId#4498L, employerName#4491, employerAddress#4493, employerLegalStatus#4492, potentialVisaSponsorship#4944, ... 8 more fields]
+- *(3) Filter (isnotnull(_uniqueId#4498L) && (_uniqueId#4498L = _parentId#4515L))
   +- Window [collect_list(_metaData#4494.originalFilePath, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __metaDataPaths#4962, collect_list(_id#4500, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __ids#4964, collect_list(_uniqueId#4498L, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __uniqueIds#4966, collect_list(naicCode#4495, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __naicCodes#4968, collect_list(_jobTitle#4496, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS jobTitles#4970, collect_list(_worksiteState#4497, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS worksiteStates#4972, collect_list(employerAddress#4493, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __otherEmployerAddress#4974, collect_list(requestTypeCount#4507, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __requestTypeCount#4976, collect_list(petitionCountPerVisaType#4503, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __petitionCountPerVisaType#4978, collect_list(petitionCountPerCitizenship#4510, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS __petitionCountPerCitizenship#4980, count(totalPetitionCount#4514) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS totalPetitionCount#4983L, collect_list(potentialVisaSponsorship#4505, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS potentialVisaSponsorship#4944, collect_list(_reviewedCategory#4501, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _reviewedCategory#4946, collect_list(otherEmployerName#4506, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS otherEmployerName#4958, collect_list(employerPhone#4511, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS employerPhone#4956, collect_list(lawyer#4512, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS lawyer#4960, collect_list(organizationFlag#4504, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS organizationFlag#4952, collect_list(primarySub#4513, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS primarySub#4948, collect_list(primaryCrop#4508, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS primaryCrop#4950, collect_list(natureOfTemporaryNeed#4509, 0, 0) windowspecdefinition(_parentId#4515L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS natureOfTemporaryNeed#4954], [_parentId#4515L]
      +- *(2) Sort [_parentId#4515L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(_parentId#4515L, 1000)
            +- *(1) Project [naicCode#4495, _jobTitle#4496, __expectedCombination#4499, _id#4500, _worksiteState#4497, _parentId#4515L, _metaData#4494, _toBeReviewed#4502, _uniqueId#4498L, employerName#4491, employerAddress#4493, employerLegalStatus#4492, requestTypeCount#4507, petitionCountPerVisaType#4503, petitionCountPerCitizenship#4510, totalPetitionCount#4514, potentialVisaSponsorship#4505, _reviewedCategory#4501, otherEmployerName#4506, employerPhone#4511, lawyer#4512, organizationFlag#4504, primarySub#4513, primaryCrop#4508, natureOfTemporaryNeed#4509]
               +- *(1) Filter isnotnull(_parentId#4515L)
                  +- *(1) FileScan parquet default.combinationrule4[employerName#4491,employerLegalStatus#4492,employerAddress#4493,_metaData#4494,naicCode#4495,_jobTitle#4496,_worksiteState#4497,_uniqueId#4498L,__expectedCombination#4499,_id#4500,_reviewedCategory#4501,_toBeReviewed#4502,petitionCountPerVisaType#4503,organizationFlag#4504,potentialVisaSponsorship#4505,otherEmployerName#4506,requestTypeCount#4507,primaryCrop#4508,natureOfTemporaryNeed#4509,petitionCountPerCitizenship#4510,employerPhone#4511,lawyer#4512,primarySub#4513,totalPetitionCount#4514,_parentId#4515L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/Users/tlous/Development/Scala/importer-pipeline/spark-warehouse/combinati..., PartitionFilters: [], PushedFilters: [IsNotNull(_parentId)], ReadSchema: struct<employerName:string,employerLegalStatus:string,employerAddress:map<string,string>,_metaDat...

В конце концов, он должен получить 1 млн записей.Однако есть перекос.Некоторые записи будут содержать 10 000+ совпадений, некоторые 1 или ни одной.

Работа выполняется для пары 1000 записей с небольшим перекосом, но в 16-ядерном кластере EMR с 5 узлами это займет более 5 часов.Так что не хорошо.

Как я могу оптимизировать этот запрос?Должен ли я использовать groupBy и, если да, как выбрать самую последнюю запись из всех сгруппированных полей.Есть ли более разумный способ?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...