Чтение нескольких тем в загрузке Kafka Listner Spring - PullRequest
0 голосов
/ 23 апреля 2020

У меня есть две разные темы для чтения и публикации обработанных данных в веб-сервис. У меня есть условие, я должен полностью прочитать сообщения из theme1 и убедиться, что нет сообщений из theme1, я должен прочитать сообщения из topic2 и обработать его. В случае, если я начинаю читать сообщения из theme2 и получаю сообщения из topic1, мне приходится приостанавливать обработку сообщений из topic2 и читать сообщения из topic1.

Мне как-то удалось это сделать с помощью KafkaListenerEndpointRegistry. Код ListnerConfig

@Bean(kafkaListenerContainerTopic1Factory)
 public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic1Factory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setIdleEventInterval(60000L);
        factory.setBatchListener(true);
        return factory;
    }


    @Bean("kafkaListenerContainerTopic2Factory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic2Factory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }

Код Listner

@KafkaListener(id = "first-listener", topics = "topic1", containerFactory = "kafkaListenerContainerTopic1Factory")
    public void receive(@Payload List<String> messages,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets)  {
        for (int i = 0; i < messages.size(); i++) {
            LOG.info("received first='{}' with partition-offset='{}'",
                    messages.get(i), partitions.get(i) + "-" + offsets.get(i));
        }
    }





@KafkaListener(id = "second-listener", topics = "topic2", containerFactory = "kafkaListenerContaierTopic2Factory" , autoStartup="false" )
    public void receiveRel(@Payload List<String> messages,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
        for (int i = 0; i < messages.size(); i++) {
            LOG.info("received second='{}' with partition-offset='{}'",
                    messages.get(i), partitions.get(i) + "-" + offsets.get(i));
        }
    }

    @EventListener()
    public void eventHandler(ListenerContainerIdleEvent event) {
        LOG.info("Inside event");
        this.registry.getListenerContainer("second-listener").start();
    }

Я также должен иметь возможность управлять этими темами, когда у меня запущено несколько экземпляров приложения, например, при развертывании этого кода в OpenShift я могу управлять ими в модулях.

1 Ответ

0 голосов
/ 23 апреля 2020

Как я понимаю, вы хотите расставить приоритеты в сообщениях из темы1 над темой2. Это невозможно напрямую с библиотекой Кафки. Вам нужно будет реализовать пользовательскую оболочку поверх Kafka для достижения приоритетов. Вы можете привести пример здесь: https://github.com/flipkart-incubator/priority-kafka-client

...