У меня есть следующий код Spark SQL, который проверяет отсутствие определенных дат в больших таблицах (несколько миллиардов строк):
spark = SparkSession.builder \
.master("yarn") \
.appName("minimal_example") \
.config('spark.submit.deployMode', 'client') \
.getOrCreate()
SQL = '''
select distinct
substr(entrydate, 1, 10) as datum,
1 as in_table
from {table}
where entrydate >= '{datum}'
'''
print("RUN1")
df1 = spark.sql(SQL.format(datum='2017-01-01', table='table1'))
c1 = df1.count()
print("count1: ", c1)
print("RUN2")
df2 = spark.sql(SQL.format(datum='2017-01-01', table='table2'))
c2 = df2.count()
print("count2: ", c2)
По сути, функция просто получает разные даты из столбца таблицы.
Теперь часть, которую я не могу обернуть головой:
- Каждый вызов
count()
сам по себе работает нормально
- Когда я запускаю каждый вызов как отдельное
spark-submit
задание, оно отлично работает
- Но если запустить их последовательно, как описано выше, второй запуск выдаст следующую ошибку:
py4j.protocol.Py4JJavaError: An error occurred while calling o150.sql.
: java.util.concurrent.ExecutionException: java.io.IOException: com.google.protobuf.ServiceException: java.lang.OutOfMemoryError: GC overhead limit exceeded
Моя интерпретация заключается в том, что сборка мусора с первого запуска начинается во время второго запуска.
Что я пробовал:
- Вызывать spark.clearCache () в начале каждой итерации
- Вызов
spark._jvm.SparkSession.clearDefaultSession()
, spark._jvm.SparkSession.clearActiveSession()
в начале каждой итерации
- Посмотрите на веб-интерфейс Spark и попробуйте разобраться с вкладками DAG и Storage (последняя ничего не отображает), но безрезультатно
- Изменить порядок двух
count
с. Это приводит к другой ошибке: java.io.IOException: Connection reset by peer
(см. здесь для аналогичной ошибки)
Последнее замечание: первый вызов раскручивает> 100 исполнителей Spark / YARN, может, механизму динамического распределения Spark не нравится, что второй вызов фактически является новой работой, которая предъявляет другие требования к исполнителям?
Любая помощь очень ценится!
Среда: Spark 2.3 на кластере Cloudera CDH 6.1.
Редактировать: некоторые подробности
- Таблицы сохраняются в виде файлов паркета в HDFS, статистика:
+--------+------------+-------+--------+--------------+
| table | # rows |# cols |# files | raw size |
+--------+------------+-------+--------+--------------+
| table1 | 5660970439 | 46 | 49167 | 228876171398 |
| table2 | 5656000217 | 52 | 80000 | 518996700170 |
+--------+------------+-------+--------+--------------+
- Настройки памяти: Spark on YARN с динамическим распределением, минимальная память исполнителя 1 ГБ, максимальная 72 ГБ, общая память кластера ~ 300 ГБ.
- Первый
count()
запускает около 150 исполнителей, полностью используя имеющиеся в настоящее время ресурсы памяти