Как зацепить подписчиков реактора - PullRequest
0 голосов
/ 01 июля 2019

У меня есть существующая цепочка интерфейсов, которую я хочу запустить как реактор вместо того, чтобы управлять своими собственными потоками и очередями

public interface UserLookupService {
    public User lookup(String id);
}
public interface UsersHandler {
    public void handle(List<User> users>);
}
UserLookupService userSvc = ...;
UsersHandler usersHandler = ...

// Works well to lookup users in parallel. 
Flux.just("userA", "userB", "userC")
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(str -> {
        userSvc.lookup(str);
    });

Как связать полученный результат, чтобы он вызывал UsersHandler с партиями User?

1 Ответ

2 голосов
/ 01 июля 2019

Подписка на что-то запускает цепочку, поэтому вы вообще не можете «цеплять» подписчиков, они являются последними в цепочке.

Подумайте, если это так, вы настроите свой реактивный конвейер икогда вы subscribe, вы запускаете конвейер, и цепочка выдаст результат.

В веб-сервере subscriber обычно является вызывающим клиентом, а когда клиент subscribes запускаетцепочка событий на сервере, которая будет публиковать данные.

A Flux - это что-то вроде списка от 1 до n Mono s.Каждый объект в Mono/Flux имеет, так сказать, несколько «состояний».Это Success, Error, Cancel, Next, Completed и др.

Когда Mono/Flux внутренне переходит в состояние Success, он испускает значение в нем,Mono обычно идет Success, когда что-то разрешается в моно.

, когда вы объявляете Flux.just("userA", "userB", "userC"), вы в основном просите поток разрешить вход, который вы вводите в него.Размещение строки - это то, что разрешается мгновенно, поэтому поток переходит в состояние Success и начинает испускать строки, как только что-то Subscribes.Таким образом, все, что вам нужно сделать, - это объявить цепочку, которую вы хотите создать после кого-то Subscribes.

Это можно сделать несколькими различными способами, когда вы хотите что-то сделать и изменить значение, как вы хотитеот string до user мы обычно используем map.

Если мы просто хотим что-то сделать с каждым объектом и не возвращать ничего, мы можем использовать doOnNext.

Flux.just("userA", "userB", "userC")
            .parallel(2)
            .runOn(Schedulers.parallel())
            .map(userString -> {
                return lookupService.lookup(userString);
            })
            .doOnNext(user -> {
                // if you want to do something on each user
                // will return void so if you want to log something
                // or handle each user
            }).subscribe();

Подписка должна быть последней вещью в цепочке.

...