Пауза и возобновление работы KafkaConsumer в SparkStreaming - PullRequest
0 голосов
/ 17 июня 2020

:)

Я оказался в (странной) ситуации, когда, вкратце, я не хочу потреблять какие-либо новые записи от Kafka, поэтому пауза потребление sparkStreaming ( InputDStream [ConsumerRecord] ) для всех разделов в топи c, выполните некоторые операции и, наконец, возобновите потребление записей.

Прежде всего ... возможно ли это?

Я пробовал вот так:

var consumer: KafkaConsumer[String, String] = _    
consumer = new KafkaConsumer[String, String](properties)    
consumer.subscribe(java.util.Arrays.asList(topicName))

consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())

, но получил это:

println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]

Любая помощь чтобы понять, что мне не хватает и почему я получаю пустые результаты, когда ясно, что потребителю назначены разделы, будет приветствоваться!

Версии : Kafka: 0.10 Spark: 2.3.0 Scala: 2,11,8

1 Ответ

2 голосов
/ 17 июня 2020

Да, это возможно. Добавьте контрольную точку в свой код и передайте путь постоянного хранилища (локальный диск, S3, HDFS)

, и всякий раз, когда вы начинаете / возобновляете свою работу, он будет получать информацию о группе потребителей Kafka у потребителя смещения от контрольной точки и начать обработку с того места, где она была остановлена.

val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

Spark Check- = указывает механизм не только для сохранения смещения, но и для сохранения состояния сериализации вашего DAG этапов и заданий . Поэтому всякий раз, когда вы перезапускаете задание с новым кодом, оно будет

  1. Прочитать и обработать сериализованные данные
  2. Очистить кешированные этапы DAG, если в вашем приложении Spark есть какие-либо изменения кода
  3. Возобновить обработку с новых данных с последним кодом.

Теперь здесь чтение с диска - это всего лишь одноразовая операция , необходимая Spark для загрузки Kafka Offset, DAG и старые неполные обработанные данные.

После этого данные всегда будут сохраняться на диск с заданным интервалом контрольной точки или по умолчанию.

Потоковая передача Spark предоставляет возможность указать идентификатор группы Kafka, но Структурированный поток Spark не поддерживает.

...