Я вижу проблему, из-за которой через некоторое время, случайно, потребители перестают потреблять сообщение от topi c, и не было выдано никакой ошибки. В topi c не так много сообщений, потому что это тестовая среда.
Отставание потребителя для этой топики c будет> 0 и будет зависать от этого числа (как будто его не потребляет потребитель) бесконечно. Я проверил topi c с помощью инструмента kafka, я вижу, что раздел назначен потребителю.
Я использую несколько потоков потребителей, потребляющих топи c, вот код:
public Runnable getRunnable() {
return () -> {
final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate =
kafkaLib.createKafkaConsumerTemplate();
kafkaConsumerTemplate.receive()
.concatMap(record -> {
// process message and commit offset
})
.subscribe();
};
}
@Override
public void run(ApplicationArguments args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; i++) {
executorService.execute(getRunnable());
}
}
Мой вопрос: если нет сообщение будет отправлено в topi c в течение определенного периода времени, возможно ли, что kafkaConsumperTemplate.receive () остановит / выйдет / отменит подписку самостоятельно? Или, возможно, поток завершится как-то после окончания подписки?
Используя реактивного потребителя kafka, больше нет аннотации @KafkaListener
, поэтому я использую ApplicationRunner и порождаю потоки потребителей. Есть ли другой альтернативный правильный способ запуска реактивных потребителей кафки?