Как вы можете видеть в коде Checkpoint.scala , механизм контрольных точек сохраняет данные последних 10 контрольных точек, но это не должно быть проблемой в течение пары дней.
Обычная причина этого заключается в том, что RDD, которые вы сохраняете на диске, также линейно растут со временем. Это может быть связано с тем, что некоторые RDD не нужны для сохранения.
Вам необходимо убедиться, что при использовании структурированной потоковой передачи не существует ни одного RDD, который должен расти, и который необходимо сохранить. Например, если вы хотите рассчитать точное количество отдельных элементов по столбцу набора данных, вам нужно знать полные входные данные (что означает постоянные данные, которые линейно увеличиваются со временем, если у вас есть постоянный приток данных на пакет ). Вместо этого, если вы можете работать с приблизительным числом, вы можете использовать алгоритмы, такие как HyperLogLog ++, которые обычно требуют гораздо меньше памяти для компромисса с точностью.
Имейте в виду, что если вы используете Spark SQL, вы можете дополнительно изучить, во что превращаются ваши оптимизированные запросы, поскольку это может быть связано с тем, как Catalyst оптимизирует ваш запрос. Если нет, то, возможно, Catalyst оптимизировал бы ваш запрос, если бы вы это сделали.
В любом случае, дальнейшая мысль: если использование контрольной точки увеличивается со временем, это должно быть отражено в вашей потоковой работе, также потребляющей больше оперативной памяти линейно со временем, поскольку контрольная точка - это всего лишь сериализация контекста Spark (плюс константа метаданные). Если это так, проверьте SO на наличие связанных вопросов, таких как , почему использование памяти Spark Worker со временем увеличивается? .
Также имейте в виду, какие RDD вы называете .persist()
(и какой уровень кэша, так что вы можете метаданные на дисковые RDD и загружать их только частично в контекст Spark одновременно).