Это ошибка:
Задание прервано из-за сбоя этапа: задание 12 на этапе 37.0 завершилось неудачно 4 раза, самый последний сбой: Потерянное задание 12.3 на этапе 37.0 (TID 325, 10.139.64.5, исполнитель 20): ExecutorLostFailure (исполнитель 20 завершился из-за одного из запущенные задачи) Причина: удаленный клиент RPC отключен. Вероятно, из-за превышения порогов контейнерами или проблем с сетью. Проверьте журналы драйверов на наличие предупреждений *
Так есть ли какой-нибудь альтернативный, более эффективный способ применить эти функции, не вызывая ошибки нехватки памяти? У меня есть данные для подсчета в миллиардах.
Input Dataframe on which filtering is to be done:
+------+-------+-------+------+-------+-------+
|Pos_id|level_p|skill_p|Emp_id|level_e|skill_e|
+------+-------+-------+------+-------+-------+
| 1| 2| a| 100| 2| a|
| 1| 2| a| 100| 3| f|
| 1| 2| a| 100| 2| d|
| 1| 2| a| 101| 4| a|
| 1| 2| a| 101| 5| b|
| 1| 2| a| 101| 1| e|
| 1| 2| a| 102| 5| b|
| 1| 2| a| 102| 3| d|
| 1| 2| a| 102| 2| c|
| 2| 2| d| 100| 2| a|
| 2| 2| d| 100| 3| f|
| 2| 2| d| 100| 2| d|
| 2| 2| d| 101| 4| a|
| 2| 2| d| 101| 5| b|
| 2| 2| d| 101| 1| e|
| 2| 2| d| 102| 5| b|
| 2| 2| d| 102| 3| d|
| 2| 2| d| 102| 2| c|
| 2| 4| b| 100| 2| a|
| 2| 4| b| 100| 3| f|
+------+-------+-------+------+-------+-------+
Код фильтрации:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from pyspark.sql import functions as sf
function = udf(lambda item, items: 1 if item in items else 0, IntegerType())
df_result = new_df.withColumn('result', function(sf.col('skill_p'), sf.col('skill_e')))
df_filter = df_result.filter(sf.col("result") == 1)
df_filter.show()
res = df_filter.groupBy("Pos_id", "Emp_id").agg(
sf.collect_set("skill_p").alias("SkillsMatch"),
sf.sum("result").alias("SkillsMatchedCount"))
res.show()
Это нужно сделать для миллиардов строк.