Потоки отбрасываются из очереди, когда служба прослушивания с фиксированным пулом потоков используется с Kafka Listener - PullRequest
0 голосов
/ 08 октября 2019

Когда я пытаюсь использовать executorService (фиксированный пул потоков из 50) с Kafka, некоторые потоки удаляются из очереди и не обрабатываются.

Например, для приведенного ниже кода: количество строк, напечатанных в журналах: «Inside Kafka Listener»: 3000 «Обработка сообщения»: 2100 «Произошло исключение»: 0

@Named
public class KafkaListener {

    ExecutorService executorService = Executors.newFixedThreadPool(50);

    @Inject
    KafkaMessageProcessor kafkaMessageProcessor;

    @KafkaListener(
            topics = "topicName",
            containerFactory = "filterKafkaListenerContainerFactory")
    public void listen(String message) {
        try {
            System.out.println("Inside Kafka Listener " + message);
            executorService.submit(() -> {
                System.out.println("Processing message");
                kafkaMessageProcessor.process(message);
            });
        } catch (Exception e) {
            System.out.println("Exception occurred :" + e);
        }
    }
}

Я ожидаюколичество строк в журналах должно быть равным, поскольку newFixedThreadPool использует очередь блокировки размером Integer.MAX_VALUE.

...