Определение происхождения байтовых массивов в реактивной цепочке в Project Reactor - PullRequest
0 голосов
/ 29 октября 2018

Я строю цветное изображение из 3 изображений в оттенках серого. Я делаю 3 отдельных (параллельных) запроса для каждой цветовой полосы, используя rsocket, который возвращает Mono<byte[]>. Мне нужно собрать все 3-байтовые массивы в виде списка, чтобы я мог правильно построить изображение. Тем не менее, я запутался в том, как определить, к какой полосе принадлежит байтовый массив в следующей реактивной операции.

Вот пример кода:

 return Flux.just(redRequest, blueRequest, greenRequest)
                .parallel()
                .flatMap(client::getBytes) // calls the remote service to get the bytes for the given color band
                .sequential()
                .collectList()
                .map(response -> {
                   byte[] redBytes = response.get(0); //???
                }

Я создал класс-обертку для хранения исходного запроса, байтового массива и идентификатора полосы, чтобы я мог передавать все объекты в каждой операции, но поскольку ответ от моего клиента rsocket - Mono, я могу только материализоваться (возможно, неправильная терминология) байтовый массив, вызывая его в карте или flatMap, и в этот момент у меня нет доступа к моему классу-обертке, и я не уверен, к какой полосе принадлежит байтовый массив.

Можно ли это исправить, просто не вызывая клиента в параллельном запросе? Могу ли я когда-нибудь быть уверенным, что предметы будут распространяться по цепочке в порядке, который я определил в Flux.just().

По сути, на последней карте я просто хочу узнать, какой массив байтов принадлежит какой цветовой полосе.

1 Ответ

0 голосов
/ 29 октября 2018

Оберните каждый запрос в объекте цветным полем и измените кусок flatMap вашего конвейера, который отправляет эти запросы:

Flux.just(
  new Request(redRequest, RED),
  new Request(blueRequest, BLUE),
  new Request(greenRequest, GREEN)
)
.parallel()
.flatMap(request -> 
  client.getBytes(request)
    .map(response -> new Response(response.get(0), request.color))
)
.sequential()
.collectList()
.map(response -> {
  byte[] redBytes = response.bytes;
  Color color = response.color;
  // 
})
...