Spark структурированные сообщения подтверждения потоковой передачи - PullRequest
0 голосов
/ 11 февраля 2020

Я использую Spark Structured Streaming для чтения из топики Кафки c (скажем topic1) и использую SINK для записи в другие topi c (topic1-result). Я вижу, что сообщения не удаляются из Topic1 после записи в другую topi c с использованием Sink.

// Subscribe to 1 topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1")
  .option("subscribe", "topic1")
  .load()

//SINK to another topic 
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1")
  .option("checkpointLocation", "/tmp/checkpoint1")
  .option("topic", "topic1-result")
  .start()

В документации сказано, что мы не можем использовать автоматическую фиксацию для структурированных потоков enable.auto.commit: Kafka source doesn’t commit any offset.

но как подтвердить сообщения и удалить обработанные сообщения из топи c (topic1)

1 Ответ

2 голосов
/ 12 февраля 2020

Два соображения:

  1. Сообщения не удаляются из Kafka после того, как вы их подтвердили. Когда ваш потребитель выполняет коммит, Kafka увеличивает смещение этого topi c относительно созданной группы потребителей. Но сообщения остаются в topi c в зависимости от времени хранения, настроенного для topi c.

  2. Действительно, источник Kafka не выполняет фиксацию, поток хранится смещение, которое указывает на следующее сообщение в контрольной точке потоковой передачи dir. Поэтому, когда вы возобновляете поток, требуется последнее смещение для его получения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...