Подписка на что-то запускает цепочку, поэтому вы вообще не можете «цеплять» подписчиков, они являются последними в цепочке.
Подумайте, если это так, вы настроите свой реактивный конвейер икогда вы subscribe
, вы запускаете конвейер, и цепочка выдаст результат.
В веб-сервере subscriber
обычно является вызывающим клиентом, а когда клиент subscribes
запускаетцепочка событий на сервере, которая будет публиковать данные.
A Flux
- это что-то вроде списка от 1 до n Mono
s.Каждый объект в Mono/Flux
имеет, так сказать, несколько «состояний».Это Success
, Error
, Cancel
, Next
, Completed
и др.
Когда Mono/Flux
внутренне переходит в состояние Success
, он испускает значение в нем,Mono
обычно идет Success
, когда что-то разрешается в моно.
, когда вы объявляете Flux.just("userA", "userB", "userC")
, вы в основном просите поток разрешить вход, который вы вводите в него.Размещение строки - это то, что разрешается мгновенно, поэтому поток переходит в состояние Success
и начинает испускать строки, как только что-то Subscribes
.Таким образом, все, что вам нужно сделать, - это объявить цепочку, которую вы хотите создать после кого-то Subscribes
.
Это можно сделать несколькими различными способами, когда вы хотите что-то сделать и изменить значение, как вы хотитеот string
до user
мы обычно используем map
.
Если мы просто хотим что-то сделать с каждым объектом и не возвращать ничего, мы можем использовать doOnNext
.
Flux.just("userA", "userB", "userC")
.parallel(2)
.runOn(Schedulers.parallel())
.map(userString -> {
return lookupService.lookup(userString);
})
.doOnNext(user -> {
// if you want to do something on each user
// will return void so if you want to log something
// or handle each user
}).subscribe();
Подписка должна быть последней вещью в цепочке.