Spring Data Redis Streams (Reactive) - как правильно обрабатывать ошибки? - PullRequest
0 голосов
/ 25 февраля 2020

Я использую Redis Streams с Spring Data Redis 2.2.4. Одна вещь, которую я хочу выяснить, это , как правильно обрабатывать ошибки .

Для нереактивной модели мы устанавливаем ErrorHandler и EancelSubscriptionOnError равными StreamReadRequest. Затем мы можем контролировать, какое исключение должно отменить подписку в Streams.

Пример:

        StreamReadRequest<String> streamReadRequest = StreamReadRequest.builder(offset)
            .errorHandler(streamListener.getErrorHandler())
            .cancelOnError(e -> false)
            .consumer(consumer)
            .autoAck(true)
            .build();

Но для реактивной модели я не нахожу никаких API-интерфейсов для обработки ошибок для StreamReceiver. Когда происходит исключение из нашего обработчика бизнес-сообщений. Подписка на Redis Streams будет отменена, и сообщение больше не будет приниматься. Поэтому я должен убедиться, что в коде моей бизнес-логики c нет исключений. Смотрите пример ниже:

    public void init() {

        StreamOffset<String> offset = StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed());

        receiver.receiveAutoAck(consumer, offset)
            .flatMap(this::onMessage)
            .subscribe();

    }

    private Mono<Long> onMessage(ObjectRecord<String, MyEvent> message) {
        try {
            return redisTemplate.opsForStream().acknowledge(groupName, message);
        } catch (Exception e) {
            return Mono.just(-1L);
        }

    }
...