consumer.position возвращает исключение «Вы можете проверить позицию только для разделов, назначенных этому потребителю» даже после опроса - PullRequest
0 голосов
/ 11 января 2019

При использовании кафки я хочу получить смещение раздела. Однако, когда я вызываю позицию, я получаю ошибку:

Вы можете проверить позицию только для разделов, назначенных этому потребитель.

Есть идеи, как это исправить?

def main(args: Array[String]): Unit = {
    val topicName = "mystique-gorilla-grodd-change-stream-samplemart-samplelistener-p8"
    val ku = new KafkaUnit(5000, 12345)
    ku.setKafkaBrokerConfig("log.dirs", s"/tmp/kafka-logs-${UUID.randomUUID}")
    ku.startup()
    ku.createTopic(topicName)
    val consumer = KafkaConsumerConnectionFactory(KafkaConsumerDetails("region",
      List(ku.getKafkaConnect), "", Map(), "", "", 10000).consumerKafkaBrokers,
      s"dd-functional-tester-${UUID.randomUUID.toString}").makeConnection
    consumer.subscribe(java.util.Collections.singletonList(topicName))
    consumer.poll(Duration.ofSeconds(1))

    consumer.position(new TopicPartition(topicName, 0))
}
...