У меня есть следующий минимальный рабочий пример:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import numpy as np
sc = SparkContext()
sqlContext = SQLContext(sc)
# Create dummy pySpark DataFrame with 1e5 rows and 16 partitions
df = sqlContext.range(0, int(1e5), numPartitions=16)
def toy_example(rdd):
# Read in pySpark DataFrame partition
data = list(rdd)
# Generate random data using Numpy
rand_data = np.random.random(int(1e7))
# Apply the `int` function to each element of `rand_data`
for i in range(len(rand_data)):
e = rand_data[i]
int(e)
# Return a single `0` value
return [[0]]
# Execute the above function on each partition (16 partitions)
result = df.rdd.mapPartitions(toy_example)
result = result.collect()
При выполнении вышеизложенного память процесса Python исполнителя постоянно увеличивается после каждой итерации, предполагая, что память предыдущей итерации не освобождается, т. Е. Происходит утечка памяти. Это может привести к сбою задания, если объем памяти превышает предел памяти исполнителя - см. Ниже:
![enter image description here](https://i.stack.imgur.com/gVqB0.png)
Любое из следующих действий предотвращает утечку памяти:
- Удалить строку
data = list(rdd)
- Вставить строку
rand_data = list(rand_data.tolist())
после rand_data = np.random.random(int(1e7))
- Удалить строку
int(e)
Приведенный выше код является минимальным рабочим примером гораздо большего проекта, который не может использовать вышеуказанные исправления.
Некоторые вещи, на которые следует обратить внимание:
- Пока данные
rdd
не используются в функции, линия должна воспроизвести утечку. В реальном проекте используются данные rdd
.
- Вероятно, утечка памяти вызвана отсутствием большого массива Numpy
rand_data
- Вы должны выполнить операцию
int
для каждого элемента rand_data
, чтобы воспроизвести утечку ?
Вопрос
Можете ли вы заставить исполнителя PySpark освободить память rand_data
, вставив код в первые несколько строк или последние несколько строк функции toy_example
?
То, что уже было предпринято
Принудительно собрать мусор, вставив в конце функции:
del data, rand_data
import gc
gc.collect()
Принудительно освободить память, вставив в конец или в начало функции (навеяно вопрос Панды ):
from ctypes import cdll, CDLL
cdll.LoadLibrary("libc.so.6")
libc = CDLL("libc.so.6")
libc.malloc_trim(0)
Настройка, измерение и версии
Следующее задание PySpark было выполнено в кластере AWS EMR с одним рабочим узлом m4.xlarge. Numpy должен был быть установлен на рабочем узле с помощью bootstrapping .
Память исполнителя была измерена с использованием следующей функции (выводится в журнал исполнителя):
import resource
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
Конфигурация отправки Spark:
- spark.executor.instances = 1
- spark.executor.cores = 1
- spark.executor.memory = 6 г
- spark.master = пряжа
- spark.dynamicAllocation.enabled = false
Версия:
- EMR 5.12.1
- Spark 2.2.1
- Python 2.7.13
- Numpy 1.14.0