Как перемотать смещения Kafka в потоковом чтении с искровой структурой - PullRequest
0 голосов
/ 11 мая 2018

У меня есть задание Spark Structured Streaming, которое настроено на чтение данных из Kafka. Пожалуйста, пройдите код, чтобы проверить readStream() с параметрами, чтобы прочитать последние данные из Kafka.

Я понимаю, что readStream() читает с первого смещения при запуске нового запроса, а не при возобновлении.

Но я не знаю, как начинать новый запрос каждый раз, когда перезапускаю свою работу в IntelliJ.

val kafkaStreamingDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", AppProperties.getProp(AppConstants.PROPS_SERVICES_KAFKA_SERVERS))
  .option("subscribe", AppProperties.getProp(AppConstants.PROPS_SDV_KAFKA_TOPICS))
  .option("failOnDataLoss", "false")
  .option("startingOffsets","earliest")
  .load()
  .selectExpr("CAST(value as STRING)", "CAST(topic as STRING)")

Я также пытался установить смещения с помощью """{"topicA":{"0":0,"1":0}}"""

Следом идет мой писатель

val query = kafkaStreamingDF
  .writeStream
  .format("console")
  .start()

Каждый раз, когда я перезапускаю свою работу в IntelliJ IDE, журналы показывают, что смещение было установлено как самое позднее, а не 0 или самое раннее.

Есть ли способ очистки моей контрольной точки, в этом случае я не знаю, где находится каталог контрольных точек, потому что в приведенном выше случае я не указываю никаких контрольных точек.

Ответы [ 2 ]

0 голосов
/ 11 августа 2018

Попробуйте настроить .option("kafka.client.id", "XX"), чтобы использовать другой client.id.

0 голосов
/ 11 мая 2018

Кафка полагается на свойство auto.offset.reset, чтобы заботиться о Управлении смещением .

По умолчанию установлено значение « latest », что означает, что при отсутствии действительного смещения потребитель начнет чтение из самых новых записей (записей, которые были записаны после того, как потребитель начал работать). Альтернативой является « самое раннее », что означает, что при отсутствии действительного смещения потребитель будет считывать все данные в разделе, начиная с самого начала.

По вашему вопросу вы хотите прочитать все данные из темы. Поэтому установка «startingOffsets» на «earliest» должна работать. Но также убедитесь, что для enable.auto.commit установлено значение false.

Установка enable.auto.commit на true означает, что смещения фиксируются автоматически с частотой, контролируемой конфигурацией auto.commit.interval.ms.

Если для этого параметра установлено значение true, смещения автоматически отправляются в Kafka, когда сообщения считываются из Kafka, что не обязательно означает, что Spark завершил обработку этих сообщений. Чтобы включить точное управление фиксацией смещения, установите для параметра Kafka enable.auto.commit значение false.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...