PySpark: память Numpy не освобождается в функции карты-раздела исполнителя (утечка памяти) - PullRequest
0 голосов
/ 01 ноября 2018

У меня есть следующий минимальный рабочий пример:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import numpy as np

sc = SparkContext()
sqlContext = SQLContext(sc)

# Create dummy pySpark DataFrame with 1e5 rows and 16 partitions
df = sqlContext.range(0, int(1e5), numPartitions=16)

def toy_example(rdd):

    # Read in pySpark DataFrame partition
    data = list(rdd)

    # Generate random data using Numpy
    rand_data = np.random.random(int(1e7))

    # Apply the `int` function to each element of `rand_data`
    for i in range(len(rand_data)):
        e = rand_data[i]
        int(e)

    # Return a single `0` value
    return [[0]]

# Execute the above function on each partition (16 partitions)
result = df.rdd.mapPartitions(toy_example)
result = result.collect()

При выполнении вышеизложенного память процесса Python исполнителя постоянно увеличивается после каждой итерации, предполагая, что память предыдущей итерации не освобождается, т. Е. Происходит утечка памяти. Это может привести к сбою задания, если объем памяти превышает предел памяти исполнителя - см. Ниже:

enter image description here

Любое из следующих действий предотвращает утечку памяти:

  • Удалить строку data = list(rdd)
  • Вставить строку rand_data = list(rand_data.tolist()) после rand_data = np.random.random(int(1e7))
  • Удалить строку int(e)

Приведенный выше код является минимальным рабочим примером гораздо большего проекта, который не может использовать вышеуказанные исправления.

Некоторые вещи, на которые следует обратить внимание:

  • Пока данные rdd не используются в функции, линия должна воспроизвести утечку. В реальном проекте используются данные rdd.
  • Вероятно, утечка памяти вызвана отсутствием большого массива Numpy rand_data
  • Вы должны выполнить операцию int для каждого элемента rand_data, чтобы воспроизвести утечку ?

Вопрос

Можете ли вы заставить исполнителя PySpark освободить память rand_data, вставив код в первые несколько строк или последние несколько строк функции toy_example?

То, что уже было предпринято

Принудительно собрать мусор, вставив в конце функции:

del data, rand_data
import gc
gc.collect()

Принудительно освободить память, вставив в конец или в начало функции (навеяно вопрос Панды ):

from ctypes import cdll, CDLL
cdll.LoadLibrary("libc.so.6")
libc = CDLL("libc.so.6")
libc.malloc_trim(0)

Настройка, измерение и версии

Следующее задание PySpark было выполнено в кластере AWS EMR с одним рабочим узлом m4.xlarge. Numpy должен был быть установлен на рабочем узле с помощью bootstrapping .

Память исполнителя была измерена с использованием следующей функции (выводится в журнал исполнителя):

import resource
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss

Конфигурация отправки Spark:

  • spark.executor.instances = 1
  • spark.executor.cores = 1
  • spark.executor.memory = 6 г
  • spark.master = пряжа
  • spark.dynamicAllocation.enabled = false

Версия:

  • EMR 5.12.1
  • Spark 2.2.1
  • Python 2.7.13
  • Numpy 1.14.0

1 Ответ

0 голосов
/ 23 апреля 2019

Недавно мы столкнулись с очень похожей проблемой, и мы также не смогли принудительно освободить память, изменив код. Однако у нас сработало следующее использование Spark: spark.python.worker.reuse = False

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...