Kafka Consumer Thread подписывается на неопределенный срок - ReactiveKafkaConsumerTemplate - PullRequest
0 голосов
/ 20 марта 2020

Я вижу проблему, из-за которой через некоторое время, случайно, потребители перестают потреблять сообщение от topi c, и не было выдано никакой ошибки. В topi c не так много сообщений, потому что это тестовая среда.

Отставание потребителя для этой топики c будет> 0 и будет зависать от этого числа (как будто его не потребляет потребитель) бесконечно. Я проверил topi c с помощью инструмента kafka, я вижу, что раздел назначен потребителю.

Я использую несколько потоков потребителей, потребляющих топи c, вот код:

  public Runnable getRunnable() {
    return () -> {
      final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate = 
          kafkaLib.createKafkaConsumerTemplate();
      kafkaConsumerTemplate.receive()
          .concatMap(record -> {
            // process message and commit offset

          })
          .subscribe();
    };
  }

  @Override
  public void run(ApplicationArguments args) {
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    for (int i = 0; i < 2; i++) {
      executorService.execute(getRunnable());
    }

}

Мой вопрос: если нет сообщение будет отправлено в topi c в течение определенного периода времени, возможно ли, что kafkaConsumperTemplate.receive () остановит / выйдет / отменит подписку самостоятельно? Или, возможно, поток завершится как-то после окончания подписки?

Используя реактивного потребителя kafka, больше нет аннотации @KafkaListener, поэтому я использую ApplicationRunner и порождаю потоки потребителей. Есть ли другой альтернативный правильный способ запуска реактивных потребителей кафки?

...