Как применить противодавление, чтобы ограничить Publisher
от производства большего количества предметов, чем может обработать flatMap
, работающий параллельно?
1.Настройка
В качестве иллюстрации есть быстрый генератор имени пользователя в твиттере, медленный вызов поиска в твиттере, медленный пишущий файл в твиттере и метод печати.
private Consumer<FluxSink<String>> twitterUsernameGenerator() {
return (sink) -> Stream.of("a", "b", "c", "d").forEach(sink::next);
}
private Flux<TwitterMessage>findTwitterMessagesByUsername() {
return Flux.create(sink -> {
sink.next(new TwitterMessage(...));
sleep(2000);
sink.next(new TwitterMessage(...));
}
});
}
private void print(Object o) {
System.out.println("[" + Thread.currentThread().getName() + "] " + o);
}
Конечной целью является параллельный запуск поиска в Твиттере при применении противодавления к генератору, чтобы не выдавать больше имени пользователя, чем может быть обработано (ожидается некоторая предварительная выборка).
2.Генерация имен пользователей в Твиттере
Flux.create(twitterUsernameGenerator())
.publishOn(Schedulers.single())
.doOnNext(this::print)
.subscribe();
Прекрасно создает 5 имен пользователей в Твиттере в одном отдельном потоке
[single-1] a
[single-1] b
[single-1] c
[single-1] d
3.Поиск твиттер-сообщений
Не уверен, что это правильно, но я считаю, что flatMap
для создания множества твиттер-сообщений с одним именем пользователя и parallel
для выполнения этой интенсивной операции ввода-вывода две темы.
Flux.create(twitterUsernameGenerator())
.publishOn(Schedulers.single())
.doOnNext(this::print)
.parallel(2)
.runOn(Schedulers.newParallel("p", 2))
.flatMap(username -> findTwitterMessagesByUsername(username))
.doOnNext(this::print)
.subscribe();
Вах!Генератор генерирует имена пользователей быстрее, чем мы можем справиться.
[single-1] a
[single-1] b
[single-1] c
[single-1] d
[p-1] TwitterMessage{...}
[p-2] TwitterMessage{...}
...
3.Применение противодавления к генератору
Как я могу применить противодавление к функции генератора, чтобы результат стал ближе к этому
[single-1] a
[single-1] b
[p-1] TwitterMessage{...}
[single-1] c
[p-2] TwitterMessage{...}
[single-1] d
...