У меня есть файл ~ 60 ГБ, содержащий ~ 30 миллионов JSON объектов с разделителями строк, и я пытаюсь сопоставить каждому JSON объекту написанную мной функцию и сохранить результаты на диск.
Я использую pySpark в локальном режиме на сервере с 96 ядрами и 1 ТБ памяти.
Моя конфигурация Spark выглядит следующим образом:
configuration_properties = [
("spark.master", "local[96]"),
("spark.ui.port","4050"),
('spark.driver.memory', '750g'),
("spark.network.timeout", "10000001"),
("spark.executor.heartbeatInterval", "10000000")
]
conf = SparkConf().setAll( configuration_properties )
Я могу загрузить файл без проблем:
start_time = time.monotonic()
posts = spark.read.json( path_to_file )
print("Finished loading in {:5.3f} minutes.".format( (time.monotonic()-start_time)/60 ))
, который выполняется примерно за 1,8 минуты для всего файла объемом 60 ГБ.
Затем я пытаюсь применить свою функцию и сохранить вывод:
output_df = spark.createDataFrame( posts.rdd.map(my_function), my_schema)
output_df.write.csv(path_to_output, mode='overwrite')
... и вот тут у меня проблемы. Когда я запускаю этот код на небольшом образце данных (первая тысяча строк), он работает безупречно.
Однако, когда я пытаюсь выполнить его для всего файла 60 ГБ, данные загружаются нормально, но затем Spark кажется чтобы зависнуть от фактического вычисления, с пользовательским интерфейсом, говорящим «работает», не делая никакого прогресса вообще:
">
Если я позволю этому работать некоторое время, он никогда не добивается никакого прогресса, но через пару часов начинает выдавать java.lang.OutOfMemoryError: GC overhead limit exceeded
ошибок, которые, как кажется, появляются в виде предупреждений (вместо того, чтобы прямо убивать работу).
Когда я искал других людей, имеющих моя проблема, все остальные потоки, которые я обнаружил, относятся либо к людям, выполняющим большие объединения, либо к groupBys, склонным к перекосу, что не является моей проблемой (так как я просто применяю функцию к каждому элементу в наборе данных ), или проблемы с запуском на YARN или каком-либо другом кластере, что также не относится ко мне, поскольку я работаю локально.
Большое спасибо за вашу помощь!