У меня есть потоковое приложение Kafka, которое подписывается на множество тем, и каждая тема имеет много разделов.
Когда я создаю топологию приложения и запускаю ее, знаю ли я, какие разделы каких тем назначены текущему экземпляру моего приложения? Я хочу знать, что это независимо от того, какие записи обрабатываются или нет этим экземпляром еще.
Я знаю, что когда я получаю запись, тогда я могу сделать processorContext.partition()
и processorContext.topic()
, чтобы получить информацию о разделе / теме текущей обрабатываемой записи. Но я не ищу этого.
Я ищу эквивалент KafkaConsumer.assigment
на стороне потоков кафки.
Я также попробовал следующий код, но получаю s как 0.
<Prepare builder and sconfig>
kafkaStream = new KafkaStreams (builder, sconfig);
kafkaStream.start ();
Collection<StreamsMetadata> s = kafkaStream.allMetadata();
System.out.println("StreamsMetadata: size is " + s.size());
for (StreamsMetadata m : s) {
Set<TopicPartition> tp = m.topicPartitions();
System.out.println ("TopicPartition: " + tp.toString());
}