Я подписался на тему Кафки, как показано ниже. Мне нужно запустить некоторую логику только после того, как потребителю был назначен раздел.
Однако consumer.assignment()
возвращается как пустой набор независимо от того, как долго я жду. Если у меня нет цикла while, а затем выполняется consumer.poll()
, я получаю записи из этой темы. Может кто-нибудь сказать мне, почему это происходит?
consumer.subscribe(topics);
Set<TopicPartition> assigned=Collections.emptySet();
while(isAssigned) {
assigned = consumer.assignment();
if(!assigned.isEmpty()) {
isAssigned= false;
}
}
//consumer props
Properties props = new Properties();
props.put("bootstrap.servers", "xxx:9092,yyy:9092");
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://xxx:8081");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("max.poll.records", "100");