pySpark зависает при выполнении задачи Map с превышенным пределом служебных данных G C - конфигурация управления памятью - PullRequest
0 голосов
/ 18 апреля 2020

У меня есть файл ~ 60 ГБ, содержащий ~ 30 миллионов JSON объектов с разделителями строк, и я пытаюсь сопоставить каждому JSON объекту написанную мной функцию и сохранить результаты на диск.

Я использую pySpark в локальном режиме на сервере с 96 ядрами и 1 ТБ памяти.

Моя конфигурация Spark выглядит следующим образом:

configuration_properties = [
("spark.master", "local[96]"),
("spark.ui.port","4050"),
('spark.driver.memory',  '750g'),
("spark.network.timeout",            "10000001"),
("spark.executor.heartbeatInterval", "10000000")
]
conf = SparkConf().setAll( configuration_properties )

Я могу загрузить файл без проблем:

start_time = time.monotonic()
posts = spark.read.json( path_to_file )
print("Finished loading in {:5.3f} minutes.".format( (time.monotonic()-start_time)/60 ))

, который выполняется примерно за 1,8 минуты для всего файла объемом 60 ГБ.

Затем я пытаюсь применить свою функцию и сохранить вывод:

output_df = spark.createDataFrame( posts.rdd.map(my_function), my_schema)
output_df.write.csv(path_to_output, mode='overwrite')

... и вот тут у меня проблемы. Когда я запускаю этот код на небольшом образце данных (первая тысяча строк), он работает безупречно.

Однако, когда я пытаюсь выполнить его для всего файла 60 ГБ, данные загружаются нормально, но затем Spark кажется чтобы зависнуть от фактического вычисления, с пользовательским интерфейсом, говорящим «работает», не делая никакого прогресса вообще:

image">

Если я позволю этому работать некоторое время, он никогда не добивается никакого прогресса, но через пару часов начинает выдавать java.lang.OutOfMemoryError: GC overhead limit exceeded ошибок, которые, как кажется, появляются в виде предупреждений (вместо того, чтобы прямо убивать работу).

Когда я искал других людей, имеющих моя проблема, все остальные потоки, которые я обнаружил, относятся либо к людям, выполняющим большие объединения, либо к groupBys, склонным к перекосу, что не является моей проблемой (так как я просто применяю функцию к каждому элементу в наборе данных ), или проблемы с запуском на YARN или каком-либо другом кластере, что также не относится ко мне, поскольку я работаю локально.

Большое спасибо за вашу помощь!

...