Явно запустите Kafka Consumer In Main после запуска метода - PullRequest
0 голосов
/ 07 ноября 2019

У меня есть служба весенней загрузки, которая использует тему кафки. Когда я потребляю, я выполняю определенные задачи в сообщении kafka. Прежде чем я смогу выполнить эти операции, мне нужно подождать, пока служба загрузит некоторые данные в кеши, которые я настроил. Моя проблема в том, что если я установил для kafka автомата автозапуск, он начинает потреблять до загрузки кеша и выдает ошибку.

Я пытаюсь явно запустить потребителя после загрузки кеша, однако я получаю исключения нулевого указателя.

@Configuration
public class KafkaConfig {

    @Value("${kafka.server}")
    String server;

    @Value("${kafka.port}")
    String port;

    @Value("${kafka.group.id}")
    String groupid;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server+":"+port);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // config.put("security.protocol","SASL_PLAINTEXT");
        // config.put("sasl.kerberos.service.name","kafka");

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setAutoStartup(false);
        return factory;
    }

}

KafkaListener

    @Service
    public class KafkaConsumer {

    @Autowired
    AggregationService aggregationService;

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    private final CounterService counterService;

    public KafkaConsumer(CounterService counterService) {
        this.counterService = counterService;
    }

    @KafkaListener(topics = "gliTransactionTopic", group = "gliDecoupling", id = "gliKafkaListener")
    public boolean consume(String message,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
            @Header(KafkaHeaders.OFFSET) Long offset) throws ParseException {
        System.out.println("Inside kafka listener :" + message+" partition :"+partition.toString()+" offset :"+offset.toString());
    aggregationService.run();
            return true;
        }



}

service Для запуска остановка

    @Service
    public class DecouplingController {

        @Autowired
        private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;



        public void stop() {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry
                    .getListenerContainer("gliKafkaListener");
            listenerContainer.stop();
        }

        public void start() {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry
                    .getListenerContainer("gliKafkaListener");
            listenerContainer.start();
        }


    }

основной метод

    @SpringBootApplication
    public class DecouplingApplication {

        Ignite ignite;


        static IgniteCache<Long, MappingsEntity> mappingsCache;

        public static void main(String[] args) {
            SpringApplication.run(DecouplingApplication.class, args);

            Ignition.setClientMode(true);
            Ignite ignite = Ignition.ignite("ignite");
            loadCaches(ignite);




        }

        public static boolean loadCaches(Ignite ignite) {

            mappingsCache = ignite.getOrCreateCache("MappingsCache");


            mappingsCache.loadCache(null);

            System.out.println("Data Loaded");

            DecouplingController dc=new DecouplingController();
            dc.start();

            return true;
        }

    }

Ниже приведено исключение

    Data Loaded
    Exception in thread "main" java.lang.NullPointerException
        at com.ignite.spring.decoupling.controller.DecouplingController.start(DecouplingController.java:126)
        at com.ignite.spring.decoupling.DecouplingApplication.loadCaches(DecouplingApplication.java:64)
        at com.ignite.spring.decoupling.DecouplingApplication.main(DecouplingApplication.java:37)

1 Ответ

1 голос
/ 07 ноября 2019

Вместо того, чтобы вручную создавать объект DecouplingController, автоматически связывайте зависимость в DecouplingApplication.

@Autowired

DecouplingController deDecouplingController;

ApplicationContext, который имеет дело с автоматически связанными зависимостями, не знает об объекте, который вы вручную создали с помощью "new». Автосоединение kafkaListenerEndpointRegistry неизвестно для нового объекта DecouplingController, который вы создали.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...