Когда я пытаюсь использовать executorService (фиксированный пул потоков из 50) с Kafka, некоторые потоки удаляются из очереди и не обрабатываются.
Например, для приведенного ниже кода: количество строк, напечатанных в журналах: «Inside Kafka Listener»: 3000 «Обработка сообщения»: 2100 «Произошло исключение»: 0
@Named
public class KafkaListener {
ExecutorService executorService = Executors.newFixedThreadPool(50);
@Inject
KafkaMessageProcessor kafkaMessageProcessor;
@KafkaListener(
topics = "topicName",
containerFactory = "filterKafkaListenerContainerFactory")
public void listen(String message) {
try {
System.out.println("Inside Kafka Listener " + message);
executorService.submit(() -> {
System.out.println("Processing message");
kafkaMessageProcessor.process(message);
});
} catch (Exception e) {
System.out.println("Exception occurred :" + e);
}
}
}
Я ожидаюколичество строк в журналах должно быть равным, поскольку newFixedThreadPool использует очередь блокировки размером Integer.MAX_VALUE
.