«Превышен лимит накладных расходов GC» при выполнении двух действий в одном задании Spark; нет проблем при запуске отдельно - PullRequest
0 голосов
/ 20 марта 2019

У меня есть следующий код Spark SQL, который проверяет отсутствие определенных дат в больших таблицах (несколько миллиардов строк):

spark = SparkSession.builder \
    .master("yarn") \
    .appName("minimal_example") \
    .config('spark.submit.deployMode', 'client') \
    .getOrCreate()

SQL = '''
select distinct
  substr(entrydate, 1, 10) as datum,
  1 as in_table
from {table}
where entrydate >= '{datum}'
'''

print("RUN1")
df1 = spark.sql(SQL.format(datum='2017-01-01', table='table1'))
c1 = df1.count()
print("count1: ", c1)

print("RUN2")
df2 = spark.sql(SQL.format(datum='2017-01-01', table='table2'))
c2 = df2.count()
print("count2: ", c2)

По сути, функция просто получает разные даты из столбца таблицы.

Теперь часть, которую я не могу обернуть головой:

  • Каждый вызов count() сам по себе работает нормально
  • Когда я запускаю каждый вызов как отдельное spark-submit задание, оно отлично работает
  • Но если запустить их последовательно, как описано выше, второй запуск выдаст следующую ошибку:
py4j.protocol.Py4JJavaError: An error occurred while calling o150.sql.
: java.util.concurrent.ExecutionException: java.io.IOException: com.google.protobuf.ServiceException: java.lang.OutOfMemoryError: GC overhead limit exceeded

Моя интерпретация заключается в том, что сборка мусора с первого запуска начинается во время второго запуска.

Что я пробовал:

  1. Вызывать spark.clearCache () в начале каждой итерации
  2. Вызов spark._jvm.SparkSession.clearDefaultSession(), spark._jvm.SparkSession.clearActiveSession() в начале каждой итерации
  3. Посмотрите на веб-интерфейс Spark и попробуйте разобраться с вкладками DAG и Storage (последняя ничего не отображает), но безрезультатно
  4. Изменить порядок двух count с. Это приводит к другой ошибке: java.io.IOException: Connection reset by peer (см. здесь для аналогичной ошибки)

Последнее замечание: первый вызов раскручивает> 100 исполнителей Spark / YARN, может, механизму динамического распределения Spark не нравится, что второй вызов фактически является новой работой, которая предъявляет другие требования к исполнителям?

Любая помощь очень ценится!

Среда: Spark 2.3 на кластере Cloudera CDH 6.1.

Редактировать: некоторые подробности

  • Таблицы сохраняются в виде файлов паркета в HDFS, статистика:
   +--------+------------+-------+--------+--------------+
   | table  |   # rows   |# cols |# files |   raw size   |
   +--------+------------+-------+--------+--------------+
   | table1 | 5660970439 |    46 |  49167 | 228876171398 |
   | table2 | 5656000217 |    52 |  80000 | 518996700170 |
   +--------+------------+-------+--------+--------------+
  • Настройки памяти: Spark on YARN с динамическим распределением, минимальная память исполнителя 1 ГБ, максимальная 72 ГБ, общая память кластера ~ 300 ГБ.
  • Первый count() запускает около 150 исполнителей, полностью используя имеющиеся в настоящее время ресурсы памяти

1 Ответ

0 голосов
/ 22 марта 2019

После разрешения проблемы в течение нескольких дней, я просто попытался увеличить драйвер память:

spark2-submit --master yarn --deploy-mode client --driver-memory 4G minimal_example.py

Возможно, решающим фактором было то, что мойприложение запускается в режиме client.По-видимому, управление большим количеством исполнителей (и их удаление) стоит довольно много памяти, хотя сам драйвер получает только результат простого df.count().

...