Объедините два экземпляра Flux разных типов - PullRequest
0 голосов
/ 12 декабря 2018

С SpringBoot 2 и с классом Poi (Точка интереса):

public class Poi {
public Poi(String poidId, Double price, Double latitude, Double longitude) {...}
private String poidId;
private Double latitude;
private Double longitude;
private Double price;
//And Getters and Setters
}

У меня есть 2 потока Poi:

Flux<Poi> availablePoisFlux;
Flux<Poi> poiFlux;

Первый элемент availablePoisFlux содержит Pois с:

  • a poidId
  • NO информация о широте
  • NO информация о долготе
  • информация о цене

Второй элемент poiFlux содержит Pois с:

  • poidId
  • широта
  • долгота
  • НЕТ информации о цене

(poidId - это идентификатор Poi).

Я хочу создать новый Flux resultPoisFlux с Pois (с poidId, цена,долгота и широта) от двух Flux (poiFlux и availablePoisFlux).

Атрибут poidId является ключом между двумя потоками (poiFlux и availablePoisFlux).

Пример реализации:

Я думаю, что для этого можно использовать оператор zipWith, ноМне нужна информация и советы с реактивными операторами (и фильтром?)

Я хочу перебрать первый поток и получить информацию (цену) из второго потока, используя идентификатор poidId и обновить атрибут цены с правильным значением.

Пример входных значений:

poiFlux = Poi(poidId=poiId0, price=null, name=name0, latitude=2.2222, longitude=14.222)
poiFlux = Poi(poidId=poiId1, price=null, name=name1, latitude=3.2222, longitude=15.222)
poiFlux = Poi(poidId=poiId2, price=null, name=name2, latitude=4.2222, longitude=16.222)
poiFlux = Poi(poidId=poiId3, price=null, name=name3, latitude=5.2222, longitude=17.222)
poiFlux = Poi(poidId=poiId4, price=null, name=name4, latitude=6.2222, longitude=18.222)
poiFlux = Poi(poidId=poiId5, price=null, name=name5, latitude=7.2222, longitude=19.222)
poiFlux = Poi(poidId=poiId6, price=null, name=name6, latitude=8.2222, longitude=20.222)
poiFlux = Poi(poidId=poiId7, price=null, name=name7, latitude=9.2222, longitude=21.222)
poiFlux = Poi(poidId=poiId8, price=null, name=name8, latitude=10.2222, longitude=22.222)
poiFlux = Poi(poidId=poiId9, price=null, name=name9,  latitude=11.2222, longitude=23.222)

availablePoisFlux = Poi(poidId=poiId0, price=120.0, name=name0, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId1, price=120.0, name=name1, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId2, price=120.0, name=name2, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId3, price=120.0, name=name3, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId4, price=120.0, name=name4, latitude=null, longitude=null)

Ожидаемый результат:

resultPoisFlux = Poi(poidId=poiId0, price=120.0, name=name0, latitude=2.2222, longitude=14.222)
resultPoisFlux = Poi(poidId=poiId1, price=120.0, name=name1, latitude=3.2222, longitude=15.222)
resultPoisFlux = Poi(poidId=poiId2, price=120.0, name=name2, latitude=4.2222, longitude=16.222)
resultPoisFlux = Poi(poidId=poiId3, price=120.0, name=name3, latitude=5.2222, longitude=17.222)
resultPoisFlux = Poi(poidId=poiId4, price=120.0, name=name4, latitude=6.2222, longitude=18.222)

Примерно так:

Flux<Poi> resultPoisFlux = availablePoisFlux.zipWith(poiFlux, (a, b) -> new Poi(a.getPoidId(), a.getPrice(), getLatitudeFromPoiFluxByPoidId(a.getPoidId()), getLongitudeFromPoiFluxByPoidId(a.getPoidId())))....

Спасибо за помощь.

1 Ответ

0 голосов
/ 13 декабря 2018

zip/zipWith, но попарно объединяет только два источника ...

..., если в нем достаточно элементов для создания пар.Так что в вашем случае полезно, только если вы гарантировано , что элементы располагаются в одном и том же порядке в обоих источниках, и нет никаких расхождений в poiIds с каждой стороны.В вашем примере это так, потому что, хотя второй источник имеет только 4 элемента, эти элементы совпадают с началом первого источника.

poiFlux.zipWith(availablePoisFlux, (a, b) -> new Poi(a.getPoiId(), 
    b.getPrice(),
    a.getLatitude(),
    a.getLongitude(),
    a.getName()));

Более общее решение, менее реактивное

Если такой гарантии нет, вам нужно как-то объединить две неупорядоченные и непересекающиеся последовательности.Вы не можете сделать это, не собрав все элементы в одном из источников (предпочтительно availablePoisFlux), что означает, что это задержит обработку другого источника до завершения указанного источника.

Один из способов объединения будетсобрать все значения в карту с ключом poiId и затем «перебрать» второй источник.Поскольку некоторые элементы могут быть не найдены на карте, вам нужно handle, чтобы иметь возможность "пропустить" эти:

availablePoisFlux.collectMap(Poi::getId, Poi::getPrice)
    .flatMapMany(knownPrices -> poiFlux.handle((poi, sink) -> {
        String poiId = poi.getPoiId();
        if (knownPrices.containsKey(poiId) {
            Double price = knownPrices.get(poiId);
            Poi complete = new Poi(poiId, price, poi.getLatitude(),
                poi.getLongitude(), poi.getName());
            sink.next(complete);
        } //else do nothing and let handle skip that poi
    }));
...