Kafka Consumer Assignment возвращает пустой набор - PullRequest
0 голосов
/ 23 января 2019

Я подписался на тему Кафки, как показано ниже. Мне нужно запустить некоторую логику только после того, как потребителю был назначен раздел.

Однако consumer.assignment() возвращается как пустой набор независимо от того, как долго я жду. Если у меня нет цикла while, а затем выполняется consumer.poll(), я получаю записи из этой темы. Может кто-нибудь сказать мне, почему это происходит?

    consumer.subscribe(topics);
    Set<TopicPartition> assigned=Collections.emptySet();
    while(isAssigned) {
          assigned = consumer.assignment();
          if(!assigned.isEmpty()) {
              isAssigned= false;
          }
    }

//consumer props
Properties props = new Properties();
        props.put("bootstrap.servers", "xxx:9092,yyy:9092");
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://xxx:8081");
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("max.poll.records", "100");

1 Ответ

0 голосов
/ 24 января 2019

Пока вы не позвоните poll(), потребитель просто бездействует.

Только после вызова poll() он установит соединение с кластером, получит назначенные разделы и попытается извлечь сообщения.

Это упоминается в Javadoc :

После подписки на ряд тем потребитель автоматически присоединяется к группе при вызове poll (Duration).

Как только вы начнете звонить poll(), вы можете использовать assignment() или даже зарегистрировать ConsumerRebalanceListener, чтобы получать уведомления, когда разделы назначаются или отменяются из вашего потребительского экземпляра.

...