При использовании кафки я хочу получить смещение раздела.
Однако, когда я вызываю позицию, я получаю ошибку:
Вы можете проверить позицию только для разделов, назначенных этому
потребитель.
Есть идеи, как это исправить?
def main(args: Array[String]): Unit = {
val topicName = "mystique-gorilla-grodd-change-stream-samplemart-samplelistener-p8"
val ku = new KafkaUnit(5000, 12345)
ku.setKafkaBrokerConfig("log.dirs", s"/tmp/kafka-logs-${UUID.randomUUID}")
ku.startup()
ku.createTopic(topicName)
val consumer = KafkaConsumerConnectionFactory(KafkaConsumerDetails("region",
List(ku.getKafkaConnect), "", Map(), "", "", 10000).consumerKafkaBrokers,
s"dd-functional-tester-${UUID.randomUUID.toString}").makeConnection
consumer.subscribe(java.util.Collections.singletonList(topicName))
consumer.poll(Duration.ofSeconds(1))
consumer.position(new TopicPartition(topicName, 0))
}