Я использую Spring впервые и пытаюсь реализовать общую очередь, в которой слушатель Kafka помещает сообщения в общую очередь, а ThreadManager, который в конечном итоге будет выполнять что-то многопоточное, с элементами, которые он удаляет из общей очереди.Вот моя текущая реализация:
Слушатель:
@Component
public class Listener {
@Autowired
private QueueConfig queueConfig;
private ExecutorService executorService;
private List<Future> futuresThread1 = new ArrayList<>();
public Listener() {
Properties appProps = new AppProperties().get();
this.executorService = Executors.newFixedThreadPool(Integer.parseInt(appProps.getProperty("listenerThreads")));
}
//TODO: how can I pass an approp into this annotation?
@KafkaListener(id = "id0", topics = "bose.cdp.ingest.marge.boseaccount.normalized")
public void listener(ConsumerRecord<?, ?> record) throws InterruptedException, ExecutionException
{
futuresThread1.add(executorService.submit(new Runnable() {
@Override public void run() {
try{
queueConfig.blockingQueue().put(record);
// System.out.println(queueConfig.blockingQueue().take());
} catch (Exception e){
System.out.print(e.toString());
}
}
}));
}
}
Очередь:
@Configuration
public class QueueConfig {
private Properties appProps = new AppProperties().get();
@Bean
public BlockingQueue<ConsumerRecord> blockingQueue() {
return new ArrayBlockingQueue<>(
Integer.parseInt(appProps.getProperty("blockingQueueSize"))
);
}
}
ThreadManager:
@Component
public class ThreadManager {
@Autowired
private QueueConfig queueConfig;
private int threads;
public ThreadManager() {
Properties appProps = new AppProperties().get();
this.threads = Integer.parseInt(appProps.getProperty("threadManagerThreads"));
}
public void run() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(threads);
try {
while (true){
queueConfig.blockingQueue().take();
}
} catch (Exception e){
System.out.print(e.toString());
executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
}
}
Наконец,основной поток, из которого все начинается:
@SpringBootApplication
public class SourceAccountListenerApp {
public static void main(String[] args) {
SpringApplication.run(SourceAccountListenerApp.class, args);
ThreadManager threadManager = new ThreadManager();
try{
threadManager.run();
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
Проблема
Когда я запускаю это в отладчике, я могу сказать, что слушатель добавляет вещи в очередь,Когда ThreadManager снимает общую очередь, он сообщает, что очередь пуста, и я получаю NPE.Кажется, что автоматическое подключение не работает для подключения очереди, которую слушатель использует, к ThreadManager.Любая помощь приветствуется.