Как остановить Spark Structured Streaming от заполнения HDFS - PullRequest
0 голосов
/ 13 марта 2019

У меня есть задача Spark Structured Streaming, выполняемая в AWS EMR, которая, по сути, представляет собой объединение двух входных потоков в течение одного минуты. Входные потоки имеют 1-минутный водяной знак. Я не занимаюсь агрегацией. Я записываю результаты в S3 «вручную» с forEachBatch и foreachPartition на пакет, который преобразует данные в строку и записывает в S3.

Я хотел бы запустить это в течение длительного времени, то есть "навсегда", но, к сожалению, Spark медленно заполняет хранилище HDFS в моем кластере и в конечном итоге умирает из-за этого.

Кажется, что существует два типа данных, которые накапливаются. Регистрирует файлы /var и .delta, .snapshot в /mnt/tmp/.../. Они не удаляются, когда я убиваю задачу с помощью CTRL + C (или в случае использования yarn с yarn application kill), я должен удалить их вручную.

Я запускаю свою задачу с spark-submit. Я попытался добавить

--conf spark.streaming.ui.retainedBatches=100 \
--conf spark.streaming.stopGracefullyOnShutdown=true \
--conf spark.cleaner.referenceTracking.cleanCheckpoints=true \
--conf spark.cleaner.periodicGC.interval=15min \
--conf spark.rdd.compress=true

без эффекта. Когда я добавляю --master yarn, пути, в которых хранятся временные файлы, немного меняются, но проблема их накопления со временем сохраняется. Добавление --deploy-mode cluster, похоже, усугубляет проблему, так как кажется, что записывается больше данных.

Раньше в моем коде было Trigger.ProcessingTime("15 seconds), но я удалил его, когда прочитал, что Spark может не выполнить очистку после себя, если время запуска слишком мало по сравнению с временем вычисления. Это, похоже, немного помогло, HDFS заполняется медленнее, но временные файлы все еще накапливаются.

Если я не присоединяюсь к двум потокам, а просто select для обоих и union результатов для записи их в S3, накопление cruft int /mnt/tmp не произойдет. Может быть, мой кластер слишком мал для входных данных?

Я хотел бы понять, почему Spark пишет эти временные файлы и как ограничить занимаемое ими пространство. Я также хотел бы знать, как ограничить объем пространства, используемого журналами.

1 Ответ

0 голосов
/ 15 апреля 2019

Spark заполняет HDFS журналами из-за https://issues.apache.org/jira/browse/SPARK-22783

Необходимо установить spark.eventLog.enabled=false, чтобы журналы не создавались.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...