У меня есть задача Spark Structured Streaming, выполняемая в AWS EMR, которая, по сути, представляет собой объединение двух входных потоков в течение одного минуты. Входные потоки имеют 1-минутный водяной знак. Я не занимаюсь агрегацией. Я записываю результаты в S3 «вручную» с forEachBatch
и foreachPartition
на пакет, который преобразует данные в строку и записывает в S3.
Я хотел бы запустить это в течение длительного времени, то есть "навсегда", но, к сожалению, Spark медленно заполняет хранилище HDFS в моем кластере и в конечном итоге умирает из-за этого.
Кажется, что существует два типа данных, которые накапливаются. Регистрирует файлы /var
и .delta
, .snapshot
в /mnt/tmp/.../
. Они не удаляются, когда я убиваю задачу с помощью CTRL + C (или в случае использования yarn
с yarn application kill
), я должен удалить их вручную.
Я запускаю свою задачу с spark-submit
. Я попытался добавить
--conf spark.streaming.ui.retainedBatches=100 \
--conf spark.streaming.stopGracefullyOnShutdown=true \
--conf spark.cleaner.referenceTracking.cleanCheckpoints=true \
--conf spark.cleaner.periodicGC.interval=15min \
--conf spark.rdd.compress=true
без эффекта. Когда я добавляю --master yarn
, пути, в которых хранятся временные файлы, немного меняются, но проблема их накопления со временем сохраняется. Добавление --deploy-mode cluster
, похоже, усугубляет проблему, так как кажется, что записывается больше данных.
Раньше в моем коде было Trigger.ProcessingTime("15 seconds)
, но я удалил его, когда прочитал, что Spark может не выполнить очистку после себя, если время запуска слишком мало по сравнению с временем вычисления. Это, похоже, немного помогло, HDFS заполняется медленнее, но временные файлы все еще накапливаются.
Если я не присоединяюсь к двум потокам, а просто select
для обоих и union
результатов для записи их в S3, накопление cruft int /mnt/tmp
не произойдет. Может быть, мой кластер слишком мал для входных данных?
Я хотел бы понять, почему Spark пишет эти временные файлы и как ограничить занимаемое ими пространство. Я также хотел бы знать, как ограничить объем пространства, используемого журналами.