:)
Я оказался в (странной) ситуации, когда, вкратце, я не хочу потреблять какие-либо новые записи от Kafka, поэтому пауза потребление sparkStreaming ( InputDStream [ConsumerRecord] ) для всех разделов в топи c, выполните некоторые операции и, наконец, возобновите потребление записей.
Прежде всего ... возможно ли это?
Я пробовал вот так:
var consumer: KafkaConsumer[String, String] = _
consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(java.util.Arrays.asList(topicName))
consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())
, но получил это:
println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]
Любая помощь чтобы понять, что мне не хватает и почему я получаю пустые результаты, когда ясно, что потребителю назначены разделы, будет приветствоваться!
Версии : Kafka: 0.10 Spark: 2.3.0 Scala: 2,11,8