Я хотел бы спроектировать конвейер обработки с помощью Reactor, который выполняет следующие действия:
У меня есть два входных издателя: orderEntries
(холодный) и hotBroadcasts
(горячий).Я хотел бы объединить элементы, испускаемые hotBroadcasts
, в (изменяемую) структуру данных памяти - скажем, HashMap
- и для каждого элемента из orderEntries
я хотел бы выбрать соответствующий элемент из этой карты, создать результирующийitem и push для последующего подписчика.
События от hotBroadcasts
происходят в произвольном порядке, поэтому я хочу сохранить их в памяти для удобства поиска.
Концептуально, это должно работать так:
orderEntries hotBroadcasts
| |
| |
| |
\ /
----------------> <----------------
(aggregate events from hotBroadcasts)
|
|
resulting item
|
|
\/
downstream subcriber
Пока мне удалось набросать решение с помощью ReplayProcessor
, проиллюстрированного псевдо-примером Kotlin:
val orderEntries = Flux.interval(Duration.of(1, ChronoUnit.SECONDS))
val hotBroadcasts = ReplayProcessor.create<String>(1000, false)
orderEntries.concatMap { entryId ->
// problematic filter - skims through all that ReplayProcessor has cached
hotBroadcasts.filter { broadcastId ->
"Broadcast:$entryId" == broadcastId
}
.take(1)
.map { "EntryId: $entryId, BroadcastId: $it" }
}.subscribe { LOG.info(it) }
Flux.interval(Duration.of(200, ChronoUnit.MILLIS))
.concatMap { Flux.just(it, it - 100000) }
.map { "Broadcast:$it" }
.subscribe {
hotBroadcasts.onNext(it)
}
Проблема здесь в том, что фильтрация hotBroadcast
просматривает все предметы для каждого предмета из orderEntries
.Отсюда моя идея хранить их в HashMap.
Может ли кто-нибудь указать мне правильное направление?