Я использую Spring data Redis для потребления из потока Redis, использую приемник реактивного потока для прослушивания группы потребителей, но заметил, что поток Flux иногда преждевременно закрывается и больше не слушает новые сообщения, ипоток заканчивается преждевременно.
Код
StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder()
.build();
StreamReceiver.create(reactiveConnFactory, options)
.receiveAutoAck("CONSUMER_GRP", "CONSUMER_ID_1"), StreamOffset.create(
"CONSUMER_STREAM",
ReadOffset.lastConsumed()))
.doOnNext(msg -> LOG.info("Got [{}] message from stream", msg))
.flatMap(msg -> Mono.fromRunnable(() -> process("reactive", msg))
.subscribeOn(streamConsumerExecutor))
.onErrorResume(t -> Flux.empty())
.doOnCancel(() -> LOG.info("Consumer Stream was cancelled"))
.doOnComplete(() -> {
LOG.info("Consumer Stream Completed");
})
.doOnTerminate(() -> {
LOG.info("Consumer Stream terminated");
})
.subscribe();
Через некоторое время чтения сообщений из потока получите журнал о том, что "поток потребителей завершен"
версия : 2.2.0.RELEASE
Это ошибка или я что-то упустил, может ли кто-нибудь помочь?
ОБНОВЛЕНИЕ
Похоже, что для команд redis истекает время ожидания, поскольку я получаю RedisCommandTimeoutException, есть ли способ повторить процесс потоковой передачи при таких ошибках, а не отменить его. Также выяснилось, что это происходит в операции XREADGROUP, хотя при запуске через redis-cli nodejs выполнение той же команды работало нормально?