Project Reactor - объедините два издателя и выведите результат - PullRequest
0 голосов
/ 29 мая 2019

Я хотел бы спроектировать конвейер обработки с помощью Reactor, который выполняет следующие действия:

У меня есть два входных издателя: orderEntries (холодный) и hotBroadcasts (горячий).Я хотел бы объединить элементы, испускаемые hotBroadcasts, в (изменяемую) структуру данных памяти - скажем, HashMap - и для каждого элемента из orderEntries я хотел бы выбрать соответствующий элемент из этой карты, создать результирующийitem и push для последующего подписчика.

События от hotBroadcasts происходят в произвольном порядке, поэтому я хочу сохранить их в памяти для удобства поиска.

Концептуально, это должно работать так:

       orderEntries                      hotBroadcasts
           |                                   | 
           |                                   | 
           |                                   | 
           \                                   / 
            ----------------> <----------------
                   (aggregate events from hotBroadcasts)     
                             |
                             |
                        resulting item
                             |
                             |
                            \/
                      downstream subcriber  

Пока мне удалось набросать решение с помощью ReplayProcessor, проиллюстрированного псевдо-примером Kotlin:

val orderEntries = Flux.interval(Duration.of(1, ChronoUnit.SECONDS))
val hotBroadcasts = ReplayProcessor.create<String>(1000, false)

orderEntries.concatMap { entryId ->
    // problematic filter - skims through all that ReplayProcessor has cached
    hotBroadcasts.filter { broadcastId ->
        "Broadcast:$entryId" == broadcastId
    }
    .take(1)
    .map { "EntryId: $entryId, BroadcastId: $it" }
}.subscribe { LOG.info(it) }

Flux.interval(Duration.of(200, ChronoUnit.MILLIS))
        .concatMap { Flux.just(it, it - 100000) }
        .map { "Broadcast:$it" }
        .subscribe {
            hotBroadcasts.onNext(it)
        }

Проблема здесь в том, что фильтрация hotBroadcastпросматривает все предметы для каждого предмета из orderEntries.Отсюда моя идея хранить их в HashMap.

Может ли кто-нибудь указать мне правильное направление?

1 Ответ

0 голосов
/ 03 июня 2019

Объект, который может объединять сообщения от двух разных издателей, представляет собой асинхронный вызов процедуры с 2 параметрами.Такой вызов может быть сконструирован в rxjava с использованием io.reactivex.Single.zip(SingleSource arg1, SingleSource arg2, BiFunction func) или в чистой Java с использованием java.util.concurrent.CompletableFuture.thenCombine(CompletionStage arg2, BiFunction func).

. Вам нужен специальный HashMap, содержащий эти асинхронные вызовы процедур.Вызовы должны создаваться автоматически при первом обращении к этой HashMap с данной меткой.

Таким образом, один Publicher вызывает

asyncProc=callMap.get(label); // asyncProc is created and stored with the label as a key
asyncProc.arg1.complete(value);

, а другой Publicher вызывает

asyncProc=callMap.get(label); // previously created instance returned
asyncProc.arg2.complete(value);

После того, как оба издателя предоставили свои аргументы, выполняется асинхронная процедура.

...