kafkaendpointlistenerregistry.start () выдает исключение с нулевым указателем - PullRequest
1 голос
/ 16 июня 2020

У меня есть требование, в котором я хочу запустить потребителя Kakfa вручную.

Код:

class    Dummy implements    
        ConsumerSeekAware
{

@Autowired     
KafkaListenerEndpointRegistry registry;

CountDownLatch latch;

@Autowired    
ConcurrentKafkaListenerContainerFactory     factory;

onIdleEvent(){
    latch.countdown()
}       
@KafkaListener(id="myContainer", 
   topics="mytopic",  
  autoStartup="false")    
public void listen() {}


@Scheduled(cron=" some time ")
    void do_some_consumption(){

latch = new CountDownLatch(1);    
this.registry.getListenerContainer("myContainer").start();    
    latch.await();
        do processing


   this.registry.getListenerContainer("myContainer").stop()

}
}

Я сделал bean-компонент ConcurrentKafkaListenerContainerFactory со всеми реквизитами в другом моем классе Config, am Autowiring здесь.

Однако я получаю исключение с нулевым указателем, когда запускаю свой контейнер с помощью this.registry.getListenerContainer ("myContainer"). start ()

java.lang.NullPointerException: null
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

1 Ответ

0 голосов
/ 17 июня 2020

Я только что скопировал ваш код в приложение Spring Boot (которое автоматически настраивает фабрики); и все работает как положено ...

@SpringBootApplication
@EnableScheduling
public class So62412316Application {

    public static void main(String[] args) {
        SpringApplication.run(So62412316Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("mytopic", "foo");
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("mytopic").partitions(1).replicas(1).build();
    }

}

@Component
class Dummy implements ConsumerSeekAware {

    @Autowired
    KafkaListenerEndpointRegistry registry;

    CountDownLatch latch;

    @Autowired
    ConcurrentKafkaListenerContainerFactory factory;

    @EventListener
    public void onIdleEvent(ListenerContainerIdleEvent event) {
        System.out.println(event);
        latch.countDown();
    }

    @KafkaListener(id = "myContainer", topics = "mytopic", autoStartup = "false")
    public void listen(String in) {
        System.out.println(in);
    }

    @Scheduled(initialDelay = 5_000, fixedDelay = 60_000)
    void do_some_consumption() throws InterruptedException {

        latch = new CountDownLatch(1);
        this.registry.getListenerContainer("myContainer").start();
        latch.await();

        this.registry.getListenerContainer("myContainer").stop();
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=5000
...