Ниже приведен реактивный код с использованием флюса в активной зоне реактора:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> redisHashReactiveCommands.hmset(key, map))
//.flatMap(... //want to store same data async into kafka with its own back pressure handling)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
.doOnComplete(() -> log.debug("On completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
Как видите, у меня есть обратное давление для внешнего источника в мой процесс (FluxSink.OverflowStrategy.LATEST). Тем не менее, я также хочу настроить обратное давление для моего процесса на redis (redisHashReactiveCommands.hmset (key, map)), так как это может быть более узким местом, чем внешний источник для моего процесса. Я ожидаю, что мне нужно будет создать другой поток для части redis и связать его с этим потоком, но как мне этого добиться, поскольку .flatMap работает с отдельным элементом, а не с потоком элементов?
Кроме того, я хочу сохранить тот же испускаемый элемент в Kafka, но цепочка flapMap, похоже, не работает ... есть ли простой способ связать все это вместе в одном наборе функциональных вызовов (внешний источник -> мой процесс, мой процесс -> редис, мой процесс -> кафка)?