Есть ли способ чтения из определенного смещения в потоке Кафки из потокового задания Spark? - PullRequest
0 голосов
/ 24 апреля 2019

Я пытаюсь зафиксировать смещения из задания потоковой передачи Spark в Kafka, используя следующее:

OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

            // some time later, after outputs have completed
              ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

как я понял из этого вопроса:

Spark DStream от Kafka всегда начинается с начала

И это работает нормально, смещения фиксируются. Однако проблема заключается в том, что это асинхронно, что означает, что даже после того, как еще две коммиты смещения были отправлены по линии, Кафка все еще может удерживать смещение до двух коммитов ранее. Если в этот момент происходит сбой потребителя, и я возвращаю его обратно, он начинает читать сообщения, которые уже были обработаны.

Теперь из других источников, например, раздел комментариев здесь:

https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o

Я понял, что нет способа синхронно фиксировать смещения из потокового задания Spark (хотя есть один, если я использую потоки Кафки). Люди скорее предлагают сохранить смещения в базах данных, где вы сохраняете конечный результат ваших вычислений в потоке.

Теперь мой вопрос таков: Если я действительно сохраню текущее смещение чтения в своей базе данных, как мне начать чтение потока именно с этого смещения в следующий раз?

1 Ответ

0 голосов
/ 06 мая 2019

Я исследовал и нашел ответ на свой вопрос, поэтому я публикую его здесь для всех, кто может столкнуться с такой же проблемой:

  • Создайте объект Map с помощью org.apache.kafka.common.TopicPartition в качестве ключа и Long в качестве значения.Конструктор TopicPartition принимает два аргумента: название темы и раздел, из которого вы будете читать.Значением объекта Map является длинное представление смещения, из которого вы хотите прочитать поток.

    Map начальныйOffset = new HashMap <> ();startOffset.put (new TopicPartition ("topic_name", 0), 3332980L);

  • Считать содержимое потока в соответствующий JavaInputStream и предоставить ранее созданный объект Map в качестве аргумента дляМетод ConsumerStrategies.Subscribe ().

    final JavaInputDStream> stream = KafkaUtils.createDirectStream (jssc, LocationStrategies.PreferConsistent (), ConsumerStrategies.Subscribe (themes, kafkaParams, начальныйOffset)) *

    * 4*
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...