мой пользовательский класс, реализующий ConsumerAwareRebalanceListener, не работает - PullRequest
0 голосов
/ 04 ноября 2019

Я использую spring-kafka-2.2.7-RELEASE. Я пытаюсь зафиксировать событие, когда потребитель готов использовать сообщение, и пытаюсь использовать ConsumerAwareRebalanceListener, но он не работает. Пожалуйста, предложите.

@Component
public class ConsumerAwareRebalanceListenerImpl implements ConsumerAwareRebalanceListener {

  public void ConsumerAwareRebalanceListenerImpl(){
    System.out.println(" In ConsumerAwareRebalanceListenerImpl constructor");
  }

  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

  }

  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    partitions.forEach( item -> {
      TestConsumerConstants.consumerEventsMap.put("key-"+item.partition(), item.partition());
    });

  }

  @Override
  public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions){
    TestConsumerConstants.consumerEventsMap.put(consumer.toString(), partitions);
  }

}

1 Ответ

0 голосов
/ 04 ноября 2019

Я исправил проблему, и теперь она работает. Все, что мне нужно было сделать, это установить параметр containerProperty 'setConsumerRebalanceListener' объекта ConcurrentKafkaListenerContainerFactory в свой пользовательский экземпляр класса, как показано ниже

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (новый)factory.getContainerProperties (). setConsumerRebalanceListener (kafkaConsumerRebalanceListener);

...