Как управлять памятью HDFS с помощью контрольных точек структурированной потоковой передачи - PullRequest
0 голосов
/ 07 января 2019

У меня есть длительное структурированное потоковое задание, которое использует несколько тем Kafka и агрегируется в скользящем окне. Мне нужно понять, как контрольные точки управляются / очищаются в HDFS.

Задания работают нормально, и я могу восстановиться после неудачного шага без потери данных, однако, я вижу, что использование HDFS растет день ото дня. Я не могу найти документацию о том, как Spark управляет / убирает контрольные точки. Ранее контрольные точки хранились на s3, но это оказалось довольно дорогостоящим с большим количеством маленьких файлов, которые читаются / записываются.

query = formatted_stream.writeStream \
                        .format("kafka") \
                        .outputMode(output_mode) \
                        .option("kafka.bootstrap.servers", bootstrap_servers) \
                        .option("checkpointLocation", "hdfs:///path_to_checkpoints") \
                        .start()

Насколько я понимаю, контрольные точки должны быть очищены автоматически; через несколько дней я просто вижу, что моя HDFS линейно увеличивается. Как я могу убедиться, что контрольные точки управляются и HDFS не хватает места?

Принятый ответ на Очистка контрольной точки структурированной потоковой передачи Spark сообщает, что структурированная потоковая передача должна решить эту проблему, но не как и как ее можно настроить.

1 Ответ

0 голосов
/ 07 января 2019

Как вы можете видеть в коде Checkpoint.scala , механизм контрольных точек сохраняет данные последних 10 контрольных точек, но это не должно быть проблемой в течение пары дней.

Обычная причина этого заключается в том, что RDD, которые вы сохраняете на диске, также линейно растут со временем. Это может быть связано с тем, что некоторые RDD не нужны для сохранения.

Вам необходимо убедиться, что при использовании структурированной потоковой передачи не существует ни одного RDD, который должен расти, и который необходимо сохранить. Например, если вы хотите рассчитать точное количество отдельных элементов по столбцу набора данных, вам нужно знать полные входные данные (что означает постоянные данные, которые линейно увеличиваются со временем, если у вас есть постоянный приток данных на пакет ). Вместо этого, если вы можете работать с приблизительным числом, вы можете использовать алгоритмы, такие как HyperLogLog ++, которые обычно требуют гораздо меньше памяти для компромисса с точностью.

Имейте в виду, что если вы используете Spark SQL, вы можете дополнительно изучить, во что превращаются ваши оптимизированные запросы, поскольку это может быть связано с тем, как Catalyst оптимизирует ваш запрос. Если нет, то, возможно, Catalyst оптимизировал бы ваш запрос, если бы вы это сделали.

В любом случае, дальнейшая мысль: если использование контрольной точки увеличивается со временем, это должно быть отражено в вашей потоковой работе, также потребляющей больше оперативной памяти линейно со временем, поскольку контрольная точка - это всего лишь сериализация контекста Spark (плюс константа метаданные). Если это так, проверьте SO на наличие связанных вопросов, таких как , почему использование памяти Spark Worker со временем увеличивается? .

Также имейте в виду, какие RDD вы называете .persist() (и какой уровень кэша, так что вы можете метаданные на дисковые RDD и загружать их только частично в контекст Spark одновременно).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...