Как связать поток с другим потоком / моно и применить другое обратное давление? - PullRequest
0 голосов
/ 04 ноября 2018

Ниже приведен реактивный код с использованием флюса в активной зоне реактора:

Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
    .flatMap(map -> redisHashReactiveCommands.hmset(key, map))
    //.flatMap(... //want to store same data async into kafka with its own back pressure handling)
    .subscribeOn(Schedulers.parallel())
    .doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
    .doOnComplete(() -> log.debug("On completed."))
    .doOnError(exception -> log.error("Error occurred while consuming message", exception))
    .subscribe();

Как видите, у меня есть обратное давление для внешнего источника в мой процесс (FluxSink.OverflowStrategy.LATEST). Тем не менее, я также хочу настроить обратное давление для моего процесса на redis (redisHashReactiveCommands.hmset (key, map)), так как это может быть более узким местом, чем внешний источник для моего процесса. Я ожидаю, что мне нужно будет создать другой поток для части redis и связать его с этим потоком, но как мне этого добиться, поскольку .flatMap работает с отдельным элементом, а не с потоком элементов?

Кроме того, я хочу сохранить тот же испускаемый элемент в Kafka, но цепочка flapMap, похоже, не работает ... есть ли простой способ связать все это вместе в одном наборе функциональных вызовов (внешний источник -> мой процесс, мой процесс -> редис, мой процесс -> кафка)?

1 Ответ

0 голосов
/ 05 ноября 2018

Если вас не интересуют объекты результата в основной последовательности , вы можете объединить оба сохранения из flatMap. Вам нужно будет переместить подписку и войти в flatMap, чтобы поместить их во внутренних сохраняющих издателей:

Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
    .flatMap(map -> Mono.when(
        redisHashReactiveCommands.hmset(key, map)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(s -> log.debug("Redis consumed. Result -> {}", s)),

        kafkaReactiveCommand.something(map)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(s -> log.debug("Kafka consumed. Result -> {}", s)),
    ))
    //... this results in a Mono<Void>
    .doOnComplete(() -> log.debug("Both redis and kafka completed."))
    .doOnError(exception -> log.error("Error occurred while consuming message", exception))
    .subscribe();

В качестве альтернативы, если вы уверены , что оба процесса выдают либо элемент результата, либо ошибку, вы можете объединить оба результата в Tuple2, заменив when на zip.

...