Проход через объект вниз по течению в функциональном конвейере в Reactor - PullRequest
0 голосов
/ 02 декабря 2018

Я новичок в реакторном и реактивном программировании и пытаюсь решить приведенный ниже случай.

Я получаю поток объектов из темы Кафки, и для каждой записи в потоке мне нужно вызвать 2 службы иподтвердите объект.

public void consume(Flux<Data> flux)
{
flux.map(data->callRESTService1(data)).map(...<I need the data once again here to call rest service 2>
}

Сейчас я использую следующий стиль для достижения этой цели, но есть ли лучший / правильный способ сделать это?

public void consume(Flux<Data> flux)
{
   flux.subscribe(data->handleData(data));
}


 public void handleData(data)
    {
 Flux.concat(callRestService1(data),callRestService2(data)).reduce(data,reduce());
    }

Кроме того, если один изслужба не работает, мне нужно сообщить об ошибке слушателю, чтобы сообщение не было подтверждено, но в другом случае, если проверка не удалась, нужно опубликовать сообщение в другой теме.

Ответы [ 2 ]

0 голосов
/ 20 декабря 2018

Вы можете попробовать сжать в flatmap, как это

Flux<Strings>  flux = Flux.just("d");
flux.flatMap(strings -> {
return Flux.zip(callRestService1(strings).onErrorResume(throwable -> dosomeshits(throwable)),callRestService2(strings).onErrorResume(throwable -> dosomeshits(throwable)),(t1, t2) -> t1)

})

0 голосов
/ 10 декабря 2018

Тот факт, что вам нужен исходный элемент для обоих путей и что каждый путь имеет свой способ устранения ошибок, является хорошим индикатором того, что вы, вероятно, хотите flatMap:

Flux<Data> source; //= ...
return source.flatMap(value -> {
    Mono<IgnoreMe1> service1 = callRestService1(value);
    Mono<IgnoreMe2> service2 = callRestService2(value)
        .onErrorResume(e -> postErrorToTopic(e, value)); //might need some type massaging, eg. if the post to topic method returns a `Mono<Void>`

    //wait for the two to complete, propagate their errors if any, else return original value
    return Mono.when(service1, service2)
       .thenReturn(value);
}
...