Ошибка памяти в PySpark во время вычислений Filter & GroupBy - PullRequest
0 голосов
/ 02 сентября 2018

Это ошибка: Задание прервано из-за сбоя этапа: задание 12 на этапе 37.0 завершилось неудачно 4 раза, самый последний сбой: Потерянное задание 12.3 на этапе 37.0 (TID 325, 10.139.64.5, исполнитель 20): ExecutorLostFailure (исполнитель 20 завершился из-за одного из запущенные задачи) Причина: удаленный клиент RPC отключен. Вероятно, из-за превышения порогов контейнерами или проблем с сетью. Проверьте журналы драйверов на наличие предупреждений *

Так есть ли какой-нибудь альтернативный, более эффективный способ применить эти функции, не вызывая ошибки нехватки памяти? У меня есть данные для подсчета в миллиардах.

Input Dataframe on which filtering is to be done: 
+------+-------+-------+------+-------+-------+
|Pos_id|level_p|skill_p|Emp_id|level_e|skill_e|
+------+-------+-------+------+-------+-------+
|     1|      2|      a|   100|      2|      a|
|     1|      2|      a|   100|      3|      f|
|     1|      2|      a|   100|      2|      d|
|     1|      2|      a|   101|      4|      a|
|     1|      2|      a|   101|      5|      b|
|     1|      2|      a|   101|      1|      e|
|     1|      2|      a|   102|      5|      b|
|     1|      2|      a|   102|      3|      d|
|     1|      2|      a|   102|      2|      c|
|     2|      2|      d|   100|      2|      a|
|     2|      2|      d|   100|      3|      f|
|     2|      2|      d|   100|      2|      d|
|     2|      2|      d|   101|      4|      a|
|     2|      2|      d|   101|      5|      b|
|     2|      2|      d|   101|      1|      e|
|     2|      2|      d|   102|      5|      b|
|     2|      2|      d|   102|      3|      d|
|     2|      2|      d|   102|      2|      c|
|     2|      4|      b|   100|      2|      a|
|     2|      4|      b|   100|      3|      f|
+------+-------+-------+------+-------+-------+

Код фильтрации:

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from pyspark.sql import functions as sf
function = udf(lambda item, items: 1 if item in items else 0, IntegerType())
df_result = new_df.withColumn('result', function(sf.col('skill_p'), sf.col('skill_e')))

df_filter = df_result.filter(sf.col("result") == 1)
df_filter.show()

res = df_filter.groupBy("Pos_id", "Emp_id").agg(
      sf.collect_set("skill_p").alias("SkillsMatch"),
      sf.sum("result").alias("SkillsMatchedCount"))
res.show()

Это нужно сделать для миллиардов строк.

...