Кафка Тема не стирается, когда потребитель Spark читает с нее - PullRequest
0 голосов
/ 12 января 2019

Я использую следующий потребительский код в Spark для чтения из темы Кафки:

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", topicName)
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Код читается из Темы, как и ожидалось, но содержимое Темы не стирается в результате этого чтения. Повторное выполнение приводит к тому, что один и тот же набор сообщений возвращается снова и снова.

Что я должен сделать, чтобы сообщения были удалены из темы после прочтения?

Ответы [ 2 ]

0 голосов
/ 12 января 2019

Как crikcet_007 упомянутый Kafka не удаляет журналы после потребления. Вы можете управлять хранением журналов в Kafka, используя политику размера или параметры времени.

log.retention.bytes - максимальный размер журнала перед его удалением

log.retention.hours - количество часов, в течение которых файл журнала хранится перед его удалением

log.retention.minutes - количество минут для сохранения файла журнала

log.retention.ms - количество миллисекунд для хранения файла журнала

Подробнее об этих параметрах можно прочитать здесь

Вдобавок к этому дополнительному механизму для хранения журнала - сжатие журнала. Установив следующие параметры, вы можете управлять сжатием журнала

log.cleanup.policy

log.cleaner.min.compaction.lag.ms

Подробнее об этом можно прочитать здесь

0 голосов
/ 12 января 2019

Кафка не удаляет тематические сообщения при потреблении

Ваш код Spark является частью группы потребителей Kafka , и он должен был бы подтвердить, что сообщение было прочитано, и зафиксировать эти смещения , что, как я считаю, Spark делает по-своему, периодически, по умолчанию, но вы можете отключить это, установив для параметра enable.auto.commit значение false, что настоятельно рекомендуется, поскольку вы захотите контролировать, успешно ли Spark обработал коллекцию записей.

Контрольная точка или отправка смещений в долговременное хранилище некоторые способы сохранить смещения в случае перезапуска / сбоя задачи и не перечитывать те же данные

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...