Я выполняю задание pyspark, а поток данных идет от Kafka. Я пытаюсь воспроизвести сценарий в своей системе windows, чтобы узнать, что происходит, когда потребитель отключается, когда данные непрерывно передаются в Kafka.
Вот чего я ожидал.
- производитель запускается и выдает сообщения 1, 2 и 3.
- потребитель находится в сети и принимает сообщения 1, 2 и 3.
- Теперь потребитель отключается по какой-то причине, хотя производитель выдает сообщения 4, 5 и 6 и так далее ...
- когда появляется потребитель, я ожидаю, что он должен прочитать с того места, где остановился. Таким образом, потребитель должен иметь возможность читать сообщения 4, 5, 6 и так далее ....
Мое приложение pyspark не может достичь того, чего я ожидаю. вот как я создал Spark Session.
session.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "clickapijson")
.option("startingoffsets" , "latest") \
.load()
Я погуглил и собрал довольно много информации. Похоже, здесь уместен groupID. Kafka отслеживает смещения, считываемые каждым потребителем в определенном groupID. Если потребитель подписывается на topi c с groupId, скажем, G1, kafka регистрирует эту группу и consumerID и отслеживает эти groupID и ConsumerID. Если по какой-то причине потребителю необходимо отключить go и перезапустить его с тем же идентификатором группы, тогда кафка будет иметь информацию об уже прочитанных смещениях, поэтому потребитель будет читать данные с того места, где он остановился.
Это точно происходит, когда я использую следующую команду для вызова задания потребителя в CLI.
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic "clickapijson" --consumer-property group.id=test
Теперь, когда мой производитель выдает сообщения 1, 2 и 3, потребитель может потреблять . Я убил текущее задание потребителя (файл .bat CLI) после прочтения 3-го сообщения. Мой производитель выдает сообщение 4, 5 и 6 и так далее .... Теперь я возвращаю свою работу потребителя (файл .bat CLI), и он может читать данные с того места, где они остановились (из сообщения 4). Я веду себя так, как я ожидал.
Я не могу сделать то же самое в pyspark.
когда я включаю option("group.id" , "test")
, появляется сообщение об ошибке, в котором говорится, что опция Kafka group.id
не поддерживается, поскольку указанные пользователем группы потребителей не используются для отслеживания смещений.
При наблюдении за вывод консоли, каждый раз, когда моя работа потребителя pyspark запускается, он создает новый groupID. Если мое задание pyspark ранее выполнялось с идентификатором группы и не удалось, то при перезапуске оно не получает тот же старый идентификатор группы. Он случайным образом получает новый groupID. Kafka имеет информацию о смещении предыдущего идентификатора группы, но не текущего вновь созданного идентификатора группы. Следовательно, мое приложение pyspark не может читать данные, введенные в Kafka, пока оно не работает.
Если это так, то не потеряю ли я свои данные, когда задание потребителя прекратится из-за какого-то сбоя?
Как я могу присвоить свой собственный групповой идентификатор приложению pyspark или как я могу перезапустить свое приложение pyspark с тем же старым идентификатором группы?