почему KafkaMessageListenerContainer никогда не освобождает поток от исполнителя? - PullRequest
0 голосов
/ 26 декабря 2018

У меня есть программа с несколькими потоками, которые обращаются к общему ресурсу.Чтобы избежать условий гонки для общего ресурса, я решил сериализовать потоки с общим исполнителем с одним потоком.Я предпочитаю такой подход, чтобы мучить программу блокировками - моя программа не будет находиться под большой нагрузкой и не критична по времени.

Один из потоков читает сообщения с Kafka, вот как выглядит код для этогоflow:

@Bean
public KafkaMessageListenerContainer kafkaFlow(DefaultConsumerFactory consumerFactory, ExecutorService myExecutor) {
    ContainerProperties containerProperties = new ContainerProperties("myTopic");
    ... //more properties
    containerProperties.setConsumerTaskExecutor(new ConcurrentTaskExecutor(myExecutor));
    return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}

@Bean
public Executor myExecutor() {
    return Executors.newSingleThreadScheduledExecutor();
}

Проблема в том, что, пока поток Kafka работает нормально, другие потоки, кажется, никогда не получают шанс запустить.Это почему?Я использовал этот подход с другими контейнерами JMS, и они отлично работали ...

Заранее спасибо.

1 Ответ

0 голосов
/ 26 декабря 2018

Кафка требует, чтобы потребитель регулярно проводил опросы.в противном случае он отменяет назначения тем / разделов;контейнер использует выделенный поток для каждого потребителя.

Вам нужно будет poll() потребителю самостоятельно, а не использовать контейнер слушателя, если вы хотите поделиться потоком для других целей, но вам все равно нужноОбязательно poll() в пределах max.poll.interval.ms, чтобы Кафка не думал, что ваш потребитель мертв.

...