Все исполнители мертвы MinHa sh L SH PySpark примерно Схожесть Присоединиться к самоподключению на кластере EMR - PullRequest
1 голос
/ 28 мая 2020

Я сталкиваюсь с проблемами при вызове Spark MinHashL SH примерноSimilarityJoin в кадре данных комбинаций (name_id, name).

Краткое изложение проблемы, которую я пытаюсь решить:

У меня есть фрейм данных около 30 миллионов уникальных комбинаций (name_id, name) для названий компаний. Некоторые из этих названий относятся к одной и той же компании, но (i) написаны с ошибками и / или (ii) содержат дополнительные названия. Выполнение нечеткого сопоставления строк для каждой комбинации невозможно. Чтобы уменьшить количество комбинаций сопоставления нечетких строк, я использую MinHashL SH в Spark. Мой предполагаемый подход состоит в том, чтобы использовать примитивное соединение (самосоединение) с относительно большим порогом Жаккарда, чтобы я мог запускать алгоритм нечеткого сопоставления для сопоставленных комбинаций для дальнейшего улучшения разрешения неоднозначности.

Краткое изложение шагов, которые я предпринял:

  1. Используется CountVectorizer для создания вектора количества символов для каждого имени,
  2. Используется MinHashL SH и его примерное подобие Присоединяйтесь к следующему настройки:
    • numHashTables = 100
    • порог = 0,3 (порог Жаккарда для приблизительноSimilarityJoin)
  3. После приблизительноSimilarityJoin удаляю повторяющиеся комбинации (для которых что существует согласованная комбинация (i, j) и (j, i), затем я удаляю (j, i))
  4. После удаления повторяющихся комбинаций я запускаю алгоритм сопоставления нечеткой строки с использованием пакета FuzzyWuzzy чтобы уменьшить количество записей и улучшить устранение неоднозначности имен.
  5. В конце концов я запускаю connectedCom Алгоритм компонентов на оставшихся краях (i, j) для сопоставления названий компаний, принадлежащих друг другу.

Используемая часть кода:

    id_col = 'id'
    name_col = 'name'
    num_hastables = 100
    max_jaccard = 0.3
    fuzzy_threshold = 90
    fuzzy_method = fuzz.token_set_ratio

    # Calculate edges using minhash practices
    edges = MinHashLSH(inputCol='vectorized_char_lst', outputCol='hashes', numHashTables=num_hastables).\
        fit(data).\
        approxSimilarityJoin(data, data, max_jaccard).\
        select(col('datasetA.'+id_col).alias('src'),
               col('datasetA.clean').alias('src_name'),
               col('datasetB.'+id_col).alias('dst'),
               col('datasetB.clean').alias('dst_name')).\
        withColumn('comb', sort_array(array(*('src', 'dst')))).\
        dropDuplicates(['comb']).\
        rdd.\
        filter(lambda x: fuzzy_method(x['src_name'], x['dst_name']) >= fuzzy_threshold if x['src'] != x['dst'] else False).\
        toDF().\
        drop(*('src_name', 'dst_name', 'comb'))

Объясните план edges

== Physical Plan ==
*(5) HashAggregate(keys=[datasetA#232, datasetB#263], functions=[])
+- Exchange hashpartitioning(datasetA#232, datasetB#263, 200)
   +- *(4) HashAggregate(keys=[datasetA#232, datasetB#263], functions=[])
      +- *(4) Project [datasetA#232, datasetB#263]
         +- *(4) BroadcastHashJoin [entry#233, hashValue#234], [entry#264, hashValue#265], Inner, BuildRight, (UDF(datasetA#232.vectorized_char_lst, datasetB#263.vectorized_char_lst) < 0.3)
            :- *(4) Project [named_struct(id, id#10, name, name#11, clean, clean#90, char_lst, char_lst#95, vectorized_char_lst, vectorized_char_lst#107, hashes, hashes#225) AS datasetA#232, entry#233, hashValue#234]
            :  +- *(4) Filter isnotnull(hashValue#234)
            :     +- Generate posexplode(hashes#225), [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, hashes#225], false, [entry#233, hashValue#234]
            :        +- *(1) Project [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, UDF(vectorized_char_lst#107) AS hashes#225]
            :           +- InMemoryTableScan [char_lst#95, clean#90, id#10, name#11, vectorized_char_lst#107]
            :                 +- InMemoryRelation [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107], StorageLevel(disk, memory, deserialized, 1 replicas)
            :                       +- *(4) Project [id#10, name#11, pythonUDF0#114 AS clean#90, pythonUDF2#116 AS char_lst#95, UDF(pythonUDF2#116) AS vectorized_char_lst#107]
            :                          +- BatchEvalPython [<lambda>(name#11), <lambda>(<lambda>(name#11)), <lambda>(<lambda>(name#11))], [id#10, name#11, pythonUDF0#114, pythonUDF1#115, pythonUDF2#116]
            :                             +- SortAggregate(key=[name#11], functions=[first(id#10, false)])
            :                                +- *(3) Sort [name#11 ASC NULLS FIRST], false, 0
            :                                   +- Exchange hashpartitioning(name#11, 200)
            :                                      +- SortAggregate(key=[name#11], functions=[partial_first(id#10, false)])
            :                                         +- *(2) Sort [name#11 ASC NULLS FIRST], false, 0
            :                                            +- Exchange RoundRobinPartitioning(8)
            :                                               +- *(1) Filter AtLeastNNulls(n, id#10,name#11)
            :                                                  +- *(1) FileScan csv [id#10,name#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:<path>, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,name:string>
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, int, false], input[2, vector, true]))
               +- *(3) Project [named_struct(id, id#10, name, name#11, clean, clean#90, char_lst, char_lst#95, vectorized_char_lst, vectorized_char_lst#107, hashes, hashes#256) AS datasetB#263, entry#264, hashValue#265]
                  +- *(3) Filter isnotnull(hashValue#265)
                     +- Generate posexplode(hashes#256), [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, hashes#256], false, [entry#264, hashValue#265]
                        +- *(2) Project [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, UDF(vectorized_char_lst#107) AS hashes#256]
                           +- InMemoryTableScan [char_lst#95, clean#90, id#10, name#11, vectorized_char_lst#107]
                                 +- InMemoryRelation [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107], StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- *(4) Project [id#10, name#11, pythonUDF0#114 AS clean#90, pythonUDF2#116 AS char_lst#95, UDF(pythonUDF2#116) AS vectorized_char_lst#107]
                                          +- BatchEvalPython [<lambda>(name#11), <lambda>(<lambda>(name#11)), <lambda>(<lambda>(name#11))], [id#10, name#11, pythonUDF0#114, pythonUDF1#115, pythonUDF2#116]
                                             +- SortAggregate(key=[name#11], functions=[first(id#10, false)])
                                                +- *(3) Sort [name#11 ASC NULLS FIRST], false, 0
                                                   +- Exchange hashpartitioning(name#11, 200)
                                                      +- SortAggregate(key=[name#11], functions=[partial_first(id#10, false)])
                                                         +- *(2) Sort [name#11 ASC NULLS FIRST], false, 0
                                                            +- Exchange RoundRobinPartitioning(8)
                                                               +- *(1) Filter AtLeastNNulls(n, id#10,name#11)
                                                                  +- *(1) FileScan csv [id#10,name#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:<path>, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,name:string>

Как data выглядит:

+-------+--------------------+--------------------+--------------------+--------------------+
|     id|                name|               clean|            char_lst| vectorized_char_lst|
+-------+--------------------+--------------------+--------------------+--------------------+
|3633038|MURATA MACHINERY LTD|    MURATA MACHINERY|[M, U, R, A, T, A...|(33,[0,1,2,3,4,5,...|
|3632811|SOCIETE ANONYME D...|SOCIETE ANONYME D...|[S, O, C, I, E, T...|(33,[0,1,2,3,4,5,...|
|3632655|FUJIFILM CORPORATION|            FUJIFILM|[F, U, J, I, F, I...|(33,[3,10,12,13,2...|
|3633318|HEINE OPTOTECHNIK...|HEINE OPTOTECHNIK...|[H, E, I, N, E,  ...|(33,[0,1,2,3,4,5,...|
|3633523|SUNBEAM PRODUCTS INC|    SUNBEAM PRODUCTS|[S, U, N, B, E, A...|(33,[0,1,2,4,5,6,...|
|3633300|           HIVAL LTD|               HIVAL|     [H, I, V, A, L]|(33,[2,3,10,11,21...|
|3632657|             NSK LTD|                 NSK|           [N, S, K]|(33,[5,6,16],[1.0...|
|3633240|REHABILITATION IN...|REHABILITATION IN...|[R, E, H, A, B, I...|(33,[0,1,2,3,4,5,...|
|3632732|STUDIENGESELLSCHA...|STUDIENGESELLSCHA...|[S, T, U, D, I, E...|(33,[0,1,2,3,4,5,...|
|3632866|ENERGY CONVERSION...|ENERGY CONVERSION...|[E, N, E, R, G, Y...|(33,[0,1,3,5,6,7,...|
|3632895|ERGENICS POWER SY...|ERGENICS POWER SY...|[E, R, G, E, N, I...|(33,[0,1,3,4,5,6,...|
|3632897| MOLI ENERGY LIMITED|         MOLI ENERGY|[M, O, L, I,  , E...|(33,[0,1,3,5,7,8,...|
|3633275| NORDSON CORPORATION|             NORDSON|[N, O, R, D, S, O...|(33,[5,6,7,8,14],...|
|3633256|  PEROXIDCHEMIE GMBH|       PEROXIDCHEMIE|[P, E, R, O, X, I...|(33,[0,3,7,8,9,11...|
|3632695|      POWER CELL INC|          POWER CELL|[P, O, W, E, R,  ...|(33,[0,1,7,8,9,10...|
|3633037|        ERGENICS INC|            ERGENICS|[E, R, G, E, N, I...|(33,[0,3,5,6,8,9,...|
|3632878|  FORD MOTOR COMPANY|          FORD MOTOR|[F, O, R, D,  , M...|(33,[1,4,7,8,13,1...|
|3632573|    SAFT AMERICA INC|        SAFT AMERICA|[S, A, F, T,  , A...|(33,[0,1,2,3,4,6,...|
|3632852|ALCAN INTERNATION...| ALCAN INTERNATIONAL|[A, L, C, A, N,  ...|(33,[0,1,2,3,4,5,...|
|3632698|   KRUPPKOPPERS GMBH|        KRUPPKOPPERS|[K, R, U, P, P, K...|(33,[0,6,7,8,12,1...|
|3633150|ALCAN INTERNATION...| ALCAN INTERNATIONAL|[A, L, C, A, N,  ...|(33,[0,1,2,3,4,5,...|
|3632761|AMERICAN TELEPHON...|AMERICAN TELEPHON...|[A, M, E, R, I, C...|(33,[0,1,2,3,4,5,...|
|3632757|HITACHI KOKI COMP...|        HITACHI KOKI|[H, I, T, A, C, H...|(33,[1,2,3,4,7,9,...|
|3632836|HUGHES AIRCRAFT C...|     HUGHES AIRCRAFT|[H, U, G, H, E, S...|(33,[0,1,2,3,4,6,...|
|3633152|            SOSY INC|                SOSY|        [S, O, S, Y]|(33,[6,7,18],[2.0...|
|3633052|HAMAMATSU PHOTONI...|HAMAMATSU PHOTONI...|[H, A, M, A, M, A...|(33,[1,2,3,4,5,6,...|
|3633450|       AKZO NOBEL NV|          AKZO NOBEL|[A, K, Z, O,  , N...|(33,[0,1,2,5,7,10...|
|3632713| ELTRON RESEARCH INC|     ELTRON RESEARCH|[E, L, T, R, O, N...|(33,[0,1,2,4,5,6,...|
|3632533|NEC ELECTRONICS C...|     NEC ELECTRONICS|[N, E, C,  , E, L...|(33,[0,1,3,4,5,6,...|
|3632562| TARGETTI SANKEY SPA| TARGETTI SANKEY SPA|[T, A, R, G, E, T...|(33,[0,1,2,3,4,5,...|
+-------+--------------------+--------------------+--------------------+--------------------+
only showing top 30 rows

Используемое оборудование:

  1. Главный узел: m5.2xlarge 8 виртуальных ядер, 32 ГиБ памяти, только хранилище EBS Хранилище EBS: 128 ГиБ
  2. Подчиненные узлы (10x): m5.4xlarge 16 виртуальных ядер, 64 ГиБ памяти , Только хранилище EBS Хранилище EBS: 500 ГиБ

Используемые параметры отправки Spark:

spark-submit --master yarn --conf "spark.executor.instances=40" --conf "spark.default.parallelism=640" --conf "spark.shuffle.partitions=2000" --conf "spark.executor.cores=4" --conf "spark.executor.memory=14g" --conf "spark.driver.memory=14g" --conf "spark.driver.maxResultSize=14g" --conf "spark.dynamicAllocation.enabled=false" --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 run_disambiguation.py

Ошибки задачи из веб-интерфейса

ExecutorLostFailure (executor 21 exited caused by one of the running tasks) Reason: Slave lost
ExecutorLostFailure (executor 31 exited unrelated to the running tasks) Reason: Container marked as failed: container_1590592506722_0001_02_000002 on host: ip-172-31-47-180.eu-central-1.compute.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.

(Часть) журналов исполнителей:


20/05/27 16:29:09 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (25  times so far)
20/05/27 16:29:13 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (26  times so far)
20/05/27 16:29:15 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:29:17 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (0  time so far)
20/05/27 16:29:28 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (27  times so far)
20/05/27 16:29:28 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (26  times so far)
20/05/27 16:29:33 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:29:38 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (1  time so far)
20/05/27 16:29:42 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (27  times so far)
20/05/27 16:29:46 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:29:53 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:29:57 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (2  times so far)
20/05/27 16:30:00 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:30:05 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:30:10 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:30:15 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (3  times so far)
20/05/27 16:30:19 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:30:22 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:30:29 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:30:32 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (4  times so far)
20/05/27 16:30:39 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:30:39 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:30:46 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:30:47 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (5  times so far)
20/05/27 16:30:55 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:30:59 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:31:03 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:31:06 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (6  times so far)
20/05/27 16:31:13 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:31:14 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:31:22 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:31:24 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (7  times so far)
20/05/27 16:31:30 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:31:32 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:31:41 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:31:44 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (8  times so far)
20/05/27 16:31:47 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:31:48 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:32:02 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:32:03 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (9  times so far)
20/05/27 16:32:04 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:32:08 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:32:19 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:32:20 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:32:21 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (10  times so far)
20/05/27 16:32:26 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:32:37 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:32:37 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (11  times so far)
20/05/27 16:32:38 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:32:45 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:32:51 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:32:56 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (12  times so far)
20/05/27 16:32:58 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:33:03 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:33:08 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:33:13 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (13  times so far)
20/05/27 16:33:15 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:33:20 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:33:26 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:33:30 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:33:31 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (14  times so far)
20/05/27 16:33:36 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:33:46 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:33:47 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:33:51 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (15  times so far)
20/05/27 16:33:54 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:34:03 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:34:04 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:34:08 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (16  times so far)
20/05/27 16:34:14 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:34:16 INFO PythonUDFRunner: Times: total = 774701, boot = 3, init = 10, finish = 774688
20/05/27 16:34:21 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:34:22 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (17  times so far)
20/05/27 16:34:30 INFO PythonUDFRunner: Times: total = 773372, boot = 2, init = 9, finish = 773361
20/05/27 16:34:32 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:34:39 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (18  times so far)
20/05/27 16:34:46 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:34:52 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (19  times so far)
20/05/27 16:35:01 INFO PythonUDFRunner: Times: total = 776905, boot = 3, init = 11, finish = 776891
20/05/27 16:35:05 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (20  times so far)
20/05/27 16:35:19 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (21  times so far)
20/05/27 16:35:35 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (22  times so far)
20/05/27 16:35:52 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (23  times so far)
20/05/27 16:36:10 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (24  times so far)
20/05/27 16:36:29 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (25  times so far)
20/05/27 16:36:47 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (26  times so far)
20/05/27 16:37:06 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (27  times so far)
20/05/27 16:37:25 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:37:44 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:38:03 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:38:22 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:38:41 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:38:59 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:39:19 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:39:39 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:39:58 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:40:18 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:40:38 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:40:57 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:41:16 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:41:35 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:41:55 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:42:19 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:42:41 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:42:59 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
20/05/27 16:42:59 INFO DiskBlockManager: Shutdown hook called
20/05/27 16:42:59 INFO ShutdownHookManager: Shutdown hook called
20/05/27 16:42:59 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1590592506722_0001/spark-73af8e3b-f428-47d4-9e13-fed4e19cc2cd
2020-05-27T16:41:16.336+0000: [GC (Allocation Failure) 2020-05-27T16:41:16.336+0000: [ParNew: 272234K->242K(305984K), 0.0094375 secs] 9076907K->8804915K(13188748K), 0.0094895 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:41:34.686+0000: [GC (Allocation Failure) 2020-05-27T16:41:34.686+0000: [ParNew: 272242K->257K(305984K), 0.0084179 secs] 9076915K->8804947K(13188748K), 0.0084840 secs] [Times: user=0.09 sys=0.01, real=0.01 secs] 
2020-05-27T16:41:35.145+0000: [GC (Allocation Failure) 2020-05-27T16:41:35.145+0000: [ParNew: 272257K->1382K(305984K), 0.0095541 secs] 9076947K->8806073K(13188748K), 0.0096080 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:41:55.077+0000: [GC (Allocation Failure) 2020-05-27T16:41:55.077+0000: [ParNew: 273382K->2683K(305984K), 0.0097177 secs] 9078073K->8807392K(13188748K), 0.0097754 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:41:55.513+0000: [GC (Allocation Failure) 2020-05-27T16:41:55.513+0000: [ParNew: 274683K->3025K(305984K), 0.0093345 secs] 9079392K->8807734K(13188748K), 0.0093892 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:05.481+0000: [GC (Allocation Failure) 2020-05-27T16:42:05.481+0000: [ParNew: 275025K->4102K(305984K), 0.0092950 secs] 9079734K->8808830K(13188748K), 0.0093464 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:18.711+0000: [GC (Allocation Failure) 2020-05-27T16:42:18.711+0000: [ParNew: 276102K->2972K(305984K), 0.0098928 secs] 9080830K->8807700K(13188748K), 0.0099510 secs] [Times: user=0.13 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:36.493+0000: [GC (Allocation Failure) 2020-05-27T16:42:36.493+0000: [ParNew: 274972K->3852K(305984K), 0.0094324 secs] 9079700K->8808598K(13188748K), 0.0094897 secs] [Times: user=0.11 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:40.880+0000: [GC (Allocation Failure) 2020-05-27T16:42:40.880+0000: [ParNew: 275852K->2568K(305984K), 0.0111794 secs] 9080598K->8807882K(13188748K), 0.0112352 secs] [Times: user=0.13 sys=0.00, real=0.01 secs] 
Heap
 par new generation   total 305984K, used 261139K [0x0000000440000000, 0x0000000454c00000, 0x0000000483990000)
  eden space 272000K,  95% used [0x0000000440000000, 0x000000044fc82cf8, 0x00000004509a0000)
  from space 33984K,   7% used [0x00000004509a0000, 0x0000000450c220a8, 0x0000000452ad0000)
  to   space 33984K,   0% used [0x0000000452ad0000, 0x0000000452ad0000, 0x0000000454c00000)
 concurrent mark-sweep generation total 12882764K, used 8805314K [0x0000000483990000, 0x0000000795e63000, 0x00000007c0000000)
 Metaspace       used 77726K, capacity 79553K, committed 79604K, reserved 1118208K
  class space    used 10289K, capacity 10704K, committed 10740K, reserved 1048576K

Скриншоты исполнителей

То, что я пробовал:

  • Изменение spark.sql.shuffle.partitions
  • Изменение spark.default.parallelism
  • Перераспределение фрейма данных

Как решить эту проблему?

Заранее спасибо!

Thi js

Ответы [ 3 ]

2 голосов
/ 04 июня 2020

Ответ @ lokk3r действительно помог мне в правильном направлении. Однако было еще несколько вещей, которые мне пришлось сделать, прежде чем я смог запустить программу без ошибок. Я поделюсь ими, чтобы помочь людям, у которых есть похожие проблемы:

  • Прежде всего, я использовал NGrams, как предлагается @ lokk3r, вместо отдельных символов, чтобы избежать сильного перекоса данных внутри MinHashL SH алгоритм. При использовании 4-граммов data выглядит так:
+------------------------------+-------+------------------------------+------------------------------+------------------------------+
|                          name|     id|                         clean|                   ng_char_lst|           vectorized_char_lst|
+------------------------------+-------+------------------------------+------------------------------+------------------------------+
|     SOCIETE ANONYME DITE SAFT|3632811|     SOCIETE ANONYME DITE SAFT|[  S O C, S O C I, O C I E,...|(1332,[64,75,82,84,121,223,...|
|          MURATA MACHINERY LTD|3633038|              MURATA MACHINERY|[  M U R, M U R A, U R A T,...|(1332,[55,315,388,437,526,5...|
|HEINE OPTOTECHNIK GMBH AND ...|3633318|    HEINE OPTOTECHNIK GMBH AND|[  H E I, H E I N, E I N E,...|(1332,[23,72,216,221,229,34...|
|          FUJIFILM CORPORATION|3632655|                      FUJIFILM|[  F U J, F U J I, U J I F,...|(1332,[157,179,882,1028],[1...|
|          SUNBEAM PRODUCTS INC|3633523|              SUNBEAM PRODUCTS|[  S U N, S U N B, U N B E,...|(1332,[99,137,165,175,187,1...|
| STUDIENGESELLSCHAFT KOHLE MBH|3632732| STUDIENGESELLSCHAFT KOHLE MBH|[  S T U, S T U D, T U D I,...|(1332,[13,14,23,25,43,52,57...|
|REHABILITATION INSTITUTE OF...|3633240|REHABILITATION INSTITUTE OF...|[  R E H, R E H A, E H A B,...|(1332,[20,44,51,118,308,309...|
|           NORDSON CORPORATION|3633275|                       NORDSON|[  N O R, N O R D, O R D S,...|(1332,[45,88,582,1282],[1.0...|
|     ENERGY CONVERSION DEVICES|3632866|     ENERGY CONVERSION DEVICES|[  E N E, E N E R, N E R G,...|(1332,[54,76,81,147,202,224...|
|           MOLI ENERGY LIMITED|3632897|                   MOLI ENERGY|[  M O L, M O L I, O L I  ,...|(1332,[438,495,717,756,1057...|
|    ERGENICS POWER SYSTEMS INC|3632895|        ERGENICS POWER SYSTEMS|[  E R G, E R G E, R G E N,...|(1332,[6,10,18,21,24,35,375...|
|                POWER CELL INC|3632695|                    POWER CELL|[  P O W, P O W E, O W E R,...|(1332,[6,10,18,35,126,169,3...|
|            PEROXIDCHEMIE GMBH|3633256|                 PEROXIDCHEMIE|[  P E R, P E R O, E R O X,...|(1332,[326,450,532,889,1073...|
|            FORD MOTOR COMPANY|3632878|                    FORD MOTOR|[  F O R, F O R D, O R D  ,...|(1332,[156,158,186,200,314,...|
|                  ERGENICS INC|3633037|                      ERGENICS|[  E R G, E R G E, R G E N,...|(1332,[375,642,812,866,1269...|
|              SAFT AMERICA INC|3632573|                  SAFT AMERICA|[  S A F, S A F T, A F T  ,...|(1332,[498,552,1116],[1.0,1...|
|   ALCAN INTERNATIONAL LIMITED|3632598|           ALCAN INTERNATIONAL|[  A L C, A L C A, L C A N,...|(1332,[20,434,528,549,571,7...|
|             KRUPPKOPPERS GMBH|3632698|                  KRUPPKOPPERS|[  K R U, K R U P, R U P P,...|(1332,[664,795,798,1010,114...|
|       HUGHES AIRCRAFT COMPANY|3632752|               HUGHES AIRCRAFT|[  H U G, H U G H, U G H E,...|(1332,[605,632,705,758,807,...|
|AMERICAN TELEPHONE AND TELE...|3632761|AMERICAN TELEPHONE AND TELE...|[  A M E, A M E R, M E R I,...|(1332,[19,86,91,126,128,134...|
+------------------------------+-------+------------------------------+------------------------------+------------------------------+

Обратите внимание, что я добавил начальные и конечные пробелы в именах, чтобы убедиться, что порядок слов в имени не соответствует имеет значение для NGrams: 'XX YY' имеет 3 грамма 'XX ', 'X Y', ' YY', а 'YY XX' имеет 3 грамма 'YY ', 'Y X', ' XX'. Это означает, что оба имеют 0 из 6 уникальных NGrams. Если мы используем начальные и конечные пробелы: ' XX YY ' содержит 3 грамма ' XX', 'XX ', 'X Y', ' YY', 'YY ', а ' YY XX ' - 3 грамма ' YY', 'YY ', 'Y X', ' XX', 'XX '. Это означает, что оба имеют 4 из 6 уникальных NGrams. Это означает, что существует гораздо большая вероятность того, что обе записи закончатся в одном сегменте во время MinHashL SH.

  • Я экспериментировал с разными значениями n - входного параметра для NGrams. Я обнаружил, что и n=2, и n=3 по-прежнему дают такой перекос данных, что некоторые задания Spark занимают слишком много времени, в то время как другие выполняются за секунды. Таким образом, вам придется ждать вечно, прежде чем программа продолжится. Теперь я использую n=4, и это по-прежнему дает значительный перекос, но это работоспособно.

  • Чтобы еще больше уменьшить влияние перекоса данных, я также использовал дополнительную фильтрацию (в ) часто встречается NGrams в методе CountVectorizer Spark. Я установил minDF=2 таким образом, чтобы он отфильтровывал NGrams, встречающиеся только в одном имени. Я сделал это, потому что вы не можете сопоставить эти имена на основе NGram, которое все равно встречается только в одном имени. Кроме того, я установил maxDF=0.001 таким образом, чтобы он отфильтровывал NGrams, которые встречаются более чем в 0,1% имен. Это означает, что для примерно 30 миллионов имен будет отфильтровано NGrams, встречающееся чаще, чем в 30000 именах. Я решил, что слишком часто встречающееся NGram не предоставит полезной информации о том, какие имена могут быть сопоставлены.

  • Я уменьшаю количество уникальных имен (сначала 30 миллионов) до 15 миллионов. путем фильтрации нелатинских (расширенных) имен. Я заметил, что символы (например, арабский c и китайский) также вызывают большой перекос в данных. Поскольку меня в первую очередь не интересует устранение неоднозначности в названиях этих компаний, я исключил их из набора данных. Я отфильтровал, используя следующее совпадение регулярных выражений:

re.fullmatch('[\u0020-\u007F\u00A0-\u00FF\u0100-\u017F\u0180-\u024F]+'.encode(), string_to_filter.encode())
  • Это немного простой совет, но я столкнулся с некоторыми проблемами, не увидев его. Убедитесь, что вы запустили фильтр для набора данных, прежде чем передавать его алгоритму MinHashLSH, чтобы отфильтровать записи, у которых не осталось NGrams из-за настроек minDF и maxDF или просто потому, что это маленькое имя. Очевидно, что это не будет работать для алгоритма MinHashLSH.

  • Наконец, что касается настроек команды spark-submit и аппаратных настроек кластера EMR, я обнаружил, что не сделал этого. Мне не нужен кластер большего размера, как предлагали некоторые ответы на форумах. Все вышеперечисленные изменения позволили программе отлично работать в кластере с настройками, указанными в моем исходном сообщении. Уменьшение значений spark.shuffle.partitions, spark.driver.memory и spark.driver.maxResultSize существенно улучшило время работы программы. Я отправил spark-submit:

spark-submit --master yarn --conf "spark.executor.instances=40" --conf "spark.default.parallelism=640" --conf "spark.executor.cores=4" --conf "spark.executor.memory=12g" --conf "spark.driver.memory=8g" --conf "spark.driver.maxResultSize=8g" --conf "spark.dynamicAllocation.enabled=false" --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 run_disambiguation.py
1 голос
/ 29 мая 2020

approxSimilarityJoin будет хорошо распараллеливаться между рабочими процессами только в том случае, если токены, вводимые в MinHa sh, достаточно различны. Поскольку отдельные токены персонажей часто встречаются во многих записях; включите преобразование NGram в свой список персонажей, чтобы каждый жетон появлялся реже; это значительно снизит перекос данных и устранит нагрузку на память.

MinHa sh имитирует процесс создания случайной перестановки вашей совокупности токенов и выбирает токен в наборе образцов, который появляется первым в перестановке. Поскольку вы используете отдельные символы в качестве токенов, допустим, вы выбрали семя MinHa sh, которое делает символ e первым в вашей случайной перестановке. В этом случае каждая строка с буквой e в ней будет иметь соответствующий MinHa sh и будет перетасована к тому же рабочему процессу для сравнения наборов. Это приведет к сильному перекосу данных и ошибкам нехватки памяти.

0 голосов
/ 10 июня 2020

Спасибо за подробное объяснение. Какой порог вы используете и как уменьшаете ложное -ve?

...