У меня есть требование, в котором я хочу запустить потребителя Kakfa вручную.
Код:
class Dummy implements
ConsumerSeekAware
{
@Autowired
KafkaListenerEndpointRegistry registry;
CountDownLatch latch;
@Autowired
ConcurrentKafkaListenerContainerFactory factory;
onIdleEvent(){
latch.countdown()
}
@KafkaListener(id="myContainer",
topics="mytopic",
autoStartup="false")
public void listen() {}
@Scheduled(cron=" some time ")
void do_some_consumption(){
latch = new CountDownLatch(1);
this.registry.getListenerContainer("myContainer").start();
latch.await();
do processing
this.registry.getListenerContainer("myContainer").stop()
}
}
Я сделал bean-компонент ConcurrentKafkaListenerContainerFactory со всеми реквизитами в другом моем классе Config, am Autowiring здесь.
Однако я получаю исключение с нулевым указателем, когда запускаю свой контейнер с помощью this.registry.getListenerContainer ("myContainer"). start ()
java.lang.NullPointerException: null
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)