После того, как я увеличил spark.executor.memory для pyspark, он упал там, где раньше проходилКак я могу пройти через это? - PullRequest
1 голос
/ 25 сентября 2019

Короче говоря, я использовал для запуска части кода pyspark в оболочке pyspark с настройками по умолчанию (драйвер 1g, исполнитель 1g).

В некоторых местах код завис из-за некоторой неизвестной утечки памяти после несколькихЦиклы. (Я уже попробовал некоторые приемы, чтобы обрезать происхождение данных, поэтому утечка все еще неизвестна)

Чтобы лучше проверить источник утечки памяти, я должен увеличить память исполнителя, чтобы она былападение после нескольких петель.И я возьму некоторый дамп кучи для анализа.

Однако, когда я увеличил память исполнителя до 2g, приложение дало сбой до того, как оно запустило первый цикл (раньше это происходило при 3-м цикле).Это выглядит так:

2019-09-25 10:16:29,833 WARN execution.CacheManager: Asked to cache already cached data.
Sampling success
iteration #0
2019-09-25 10:16:30,005 WARN util.Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
/usr/lib64/python2.7/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
iteration 0 loss 13387.430943                                                   

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "bpr_spark2.py", line 458, in test
    bpr.build(train_data, ss, k = number_of_factors, num_iterations=80000)
  File "bpr_spark2.py", line 253, in build
    self.train(ss)
  File "bpr_spark2.py", line 353, in train
    samples = self.draw(ss).withColumn('id', functions.row_number().over(Window.orderBy('u'))).repartition(self.partitionNum)
  File "bpr_spark2.py", line 302, in draw
    while(lastSamples is None or lastSamples.count() < batchSize):
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 522, in count
    return int(self._jdf.count())
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__

  File "/opt/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1688.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
java.lang.StackOverflowError
    at java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:72)
    at sun.reflect.GeneratedMethodAccessor49.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

Здесь вы можете ожидать очень длинный стек вызовов.Похоже на какую-то операцию сброса.

Я исчез, когда я установил память водителя на 2g.Итак, о чем это?

Для среды, которую я использовал, это hadoop 3.1.2 + spark 2.4.3.С PyArrow, установленным и включенным для spark.

Исходный код для воспроизведения проблемы очень длинный, с установленной базой данных, я буду загружать код py, используемую базу данных можно найти на github / moviegeek.Я также попытаюсь загрузить среду исполнения, которая находится внутри докера.Но это будет дорого, и не ожидайте, что оно выйдет через несколько дней?

Итак, кто-нибудь может сказать мне, почему память драйвера должна совпадать с памятью исполнителя?

Ееэто ссылка на код: https://github.com/Heermosi/moviegeek-spark/blob/master/bpr_calculator_spark.py

Не могу найти точку утечки.Сконфигурируйте базу данных и запустите test (), чтобы повторить ошибку.Кто-нибудь может помочь?

Обновлено 2019/09/26:

Я изо всех сил пытался распечатать происхождение данных rdd и обнаружил, что линия данных на самом деле не обрезается!Это означает, что функция не работает должным образом, метод, опубликованный на веб-сайте, обещал обрезать без записи файлов, это шутка!

Я буду искать другие способы сократить родословную данных.На данный момент, хммм, запись в файлы будет хорошим выбором.

...