Я использую Redis Streams с Spring Data Redis 2.2.4. Одна вещь, которую я хочу выяснить, это , как правильно обрабатывать ошибки .
Для нереактивной модели мы устанавливаем ErrorHandler и EancelSubscriptionOnError равными StreamReadRequest
. Затем мы можем контролировать, какое исключение должно отменить подписку в Streams.
Пример:
StreamReadRequest<String> streamReadRequest = StreamReadRequest.builder(offset)
.errorHandler(streamListener.getErrorHandler())
.cancelOnError(e -> false)
.consumer(consumer)
.autoAck(true)
.build();
Но для реактивной модели я не нахожу никаких API-интерфейсов для обработки ошибок для StreamReceiver
. Когда происходит исключение из нашего обработчика бизнес-сообщений. Подписка на Redis Streams будет отменена, и сообщение больше не будет приниматься. Поэтому я должен убедиться, что в коде моей бизнес-логики c нет исключений. Смотрите пример ниже:
public void init() {
StreamOffset<String> offset = StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed());
receiver.receiveAutoAck(consumer, offset)
.flatMap(this::onMessage)
.subscribe();
}
private Mono<Long> onMessage(ObjectRecord<String, MyEvent> message) {
try {
return redisTemplate.opsForStream().acknowledge(groupName, message);
} catch (Exception e) {
return Mono.just(-1L);
}
}