У меня есть приложение, которое использует около 20 RDD для выполнения необходимых вычислений.Можно разделить на два задания:
Из этих 20 СДР каждый использует 15, при этом 10 разделены между обоими.
Каждый алгоритм запускается с 15-кратной фильтрацией по временному окну.Псевдо:
shareRdd1 = spark.read(...).cache()
...
shareRdd10 = spark.read(...).cache()
job1Rdd1 = spark.read(...).cache()
...
job1Rdd5 = spark.read(...).cache()
job2Rdd1 = spark.read(...).cache()
...
job2Rdd5 = spark.read(...).cache()
job1 = new Job1(shareRdd1, ..., shareRdd10, job1Rdd1, ..., job1Rdd5)
job2 = new Job2(shareRdd1, ..., shareRdd10, job2Rdd1, ..., job2Rdd5)
for i=0..14
job1.run(i)
job2.run(i)
Каждое задание использует i
для соответствующего среза данных из RDD, выполнения связок, поиска и т. Д. В конце он сохраняет данные в S3 и помещает данные в RDS.Псевдо:
void run(int i)
var result
shareRdd1.where(<some i related slicing>)
...
join, aggregate, etc
...
result = result.cache()
result.write(...)
result.forEachPartition(<push to rds>)
result.unpersist()
Проблема, которую я вижу, состоит в том, что со временем вся искра-подача все больше и больше занимает память.
Каждый раз, когда работа run
s, мы очищаемвсе внутренние результаты, убедитесь, что они удалены, и т. д. (даже пытались с .unpersist(true)
)
Единственное, что продолжает работать, когда цикл работает, это кэшированные RDD.Они загружают значительный объем работы, поэтому они кэшируются.
Задание выполняется, и вот статистика:
Если вы беретеПосмотрите на сетевой скачок около 01:37, то есть, когда все RDD загружаются из хранилища, и алгоритм начинает выполнять свою работу.По мере продолжения цикла все больше и больше памяти расходуется, пока, в конце концов, задание не остановится (возможно, из-за использования диска, поскольку вещи больше не помещаются в память).
Наши RDD имеют общий размер 3 ГБ.при просмотре вкладки «Хранилище» в веб-интерфейсе Spark.
У машины более чем достаточно памяти, чтобы справиться с ней, я даже пробовал большие машины, но это происходит точно.Кривая такая же.
Что я делаю не так?
spark = 2.3.1
emr = 5.16.0
java = 1.8
1x m4.10xlarge
--driver-memory 8g
--driver-cores 5
--conf spark.dynamicAllocation.enabled=false
--conf spark.yarn.executor.memoryOverhead=2048
--conf spark.yarn.driver.memoryOverhead=2048
--conf spark.executor.instances=14
--conf spark.executor.memory=8g
--conf spark.executor.cores=5
--conf spark.default.parallelism=140