Spring Data Redis Stream приемник завершает преждевременно - PullRequest
1 голос
/ 09 октября 2019

Я использую Spring data Redis для потребления из потока Redis, использую приемник реактивного потока для прослушивания группы потребителей, но заметил, что поток Flux иногда преждевременно закрывается и больше не слушает новые сообщения, ипоток заканчивается преждевременно.

Код

StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder() 
            .build();
 StreamReceiver.create(reactiveConnFactory, options)
            .receiveAutoAck("CONSUMER_GRP", "CONSUMER_ID_1"), StreamOffset.create(
                        "CONSUMER_STREAM",
                        ReadOffset.lastConsumed()))
            .doOnNext(msg -> LOG.info("Got [{}] message from stream", msg))
            .flatMap(msg -> Mono.fromRunnable(() -> process("reactive", msg))
                  .subscribeOn(streamConsumerExecutor))
            .onErrorResume(t -> Flux.empty())
            .doOnCancel(() -> LOG.info("Consumer Stream was cancelled"))
            .doOnComplete(() -> {
               LOG.info("Consumer Stream Completed");
            })
            .doOnTerminate(() -> {
               LOG.info("Consumer Stream terminated");
            }) 
            .subscribe();


Через некоторое время чтения сообщений из потока получите журнал о том, что "поток потребителей завершен"

версия : 2.2.0.RELEASE

Это ошибка или я что-то упустил, может ли кто-нибудь помочь?

ОБНОВЛЕНИЕ

Похоже, что для команд redis истекает время ожидания, поскольку я получаю RedisCommandTimeoutException, есть ли способ повторить процесс потоковой передачи при таких ошибках, а не отменить его. Также выяснилось, что это происходит в операции XREADGROUP, хотя при запуске через redis-cli nodejs выполнение той же команды работало нормально?

...