Перезапуск задания PySpark не приводит к получению записей, которые были вставлены в Kafka Topi c, пока потребитель pyspark не работает. - PullRequest
1 голос
/ 08 мая 2020

Я выполняю задание pyspark, а поток данных идет от Kafka. Я пытаюсь воспроизвести сценарий в своей системе windows, чтобы узнать, что происходит, когда потребитель отключается, когда данные непрерывно передаются в Kafka.

Вот чего я ожидал.

  • производитель запускается и выдает сообщения 1, 2 и 3.
  • потребитель находится в сети и принимает сообщения 1, 2 и 3.
  • Теперь потребитель отключается по какой-то причине, хотя производитель выдает сообщения 4, 5 и 6 и так далее ...
  • когда появляется потребитель, я ожидаю, что он должен прочитать с того места, где остановился. Таким образом, потребитель должен иметь возможность читать сообщения 4, 5, 6 и так далее ....

Мое приложение pyspark не может достичь того, чего я ожидаю. вот как я создал Spark Session.

session.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "clickapijson")
  .option("startingoffsets" , "latest") \
  .load()

Я погуглил и собрал довольно много информации. Похоже, здесь уместен groupID. Kafka отслеживает смещения, считываемые каждым потребителем в определенном groupID. Если потребитель подписывается на topi c с groupId, скажем, G1, kafka регистрирует эту группу и consumerID и отслеживает эти groupID и ConsumerID. Если по какой-то причине потребителю необходимо отключить go и перезапустить его с тем же идентификатором группы, тогда кафка будет иметь информацию об уже прочитанных смещениях, поэтому потребитель будет читать данные с того места, где он остановился.

Это точно происходит, когда я использую следующую команду для вызова задания потребителя в CLI.

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic "clickapijson" --consumer-property group.id=test 

Теперь, когда мой производитель выдает сообщения 1, 2 и 3, потребитель может потреблять . Я убил текущее задание потребителя (файл .bat CLI) после прочтения 3-го сообщения. Мой производитель выдает сообщение 4, 5 и 6 и так далее .... Теперь я возвращаю свою работу потребителя (файл .bat CLI), и он может читать данные с того места, где они остановились (из сообщения 4). Я веду себя так, как я ожидал.

Я не могу сделать то же самое в pyspark.

когда я включаю option("group.id" , "test"), появляется сообщение об ошибке, в котором говорится, что опция Kafka group.id не поддерживается, поскольку указанные пользователем группы потребителей не используются для отслеживания смещений.

При наблюдении за вывод консоли, каждый раз, когда моя работа потребителя pyspark запускается, он создает новый groupID. Если мое задание pyspark ранее выполнялось с идентификатором группы и не удалось, то при перезапуске оно не получает тот же старый идентификатор группы. Он случайным образом получает новый groupID. Kafka имеет информацию о смещении предыдущего идентификатора группы, но не текущего вновь созданного идентификатора группы. Следовательно, мое приложение pyspark не может читать данные, введенные в Kafka, пока оно не работает.

Если это так, то не потеряю ли я свои данные, когда задание потребителя прекратится из-за какого-то сбоя?

Как я могу присвоить свой собственный групповой идентификатор приложению pyspark или как я могу перезапустить свое приложение pyspark с тем же старым идентификатором группы?

1 Ответ

2 голосов
/ 08 мая 2020

В текущей версии Spark (2.4.5) невозможно предоставить свой собственный group.id, поскольку он автоматически создается Spark (как вы уже заметили). Полная информация об управлении смещением в Spark, считывающем из Kafka, приведена здесь и обобщена ниже:

Обратите внимание, что следующие параметры Kafka не могут быть установлены, и источник или приемник Kafka будут генерировать исключение:

group.id : источник Kafka автоматически создает уникальный идентификатор группы для каждого запроса.

auto.offset.reset : Установите параметр источника startOffsets, чтобы указать, с чего начать. Структурированная потоковая передача управляет тем, какие смещения используются внутри , а не полагается на Потребителя kafka в этом. Это гарантирует, что никакие данные не будут упущены при динамической подписке на новые темы / разделы. Обратите внимание, что startOffsets применяется только при запуске нового потокового запроса, и что возобновление всегда будет начинаться с того места, где остановился запрос.

enable.auto.commit : источник Kafka не зафиксировать любое смещение.

Чтобы Spark мог запомнить, где он остановил чтение из Kafka, вам необходимо включить контрольную точку и указать путь для хранения файлов контрольных точек. В Python это будет выглядеть так:

aggDF \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .format("memory") \
    .start()

Более подробная информация о контрольных точках приведена в документации Spark на Восстановление после сбоев с помощью контрольных точек .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...