Flux flatMap противодавление - PullRequest
1 голос
/ 03 июля 2019

Как применить противодавление, чтобы ограничить Publisher от производства большего количества предметов, чем может обработать flatMap, работающий параллельно?

1.Настройка

В качестве иллюстрации есть быстрый генератор имени пользователя в твиттере, медленный вызов поиска в твиттере, медленный пишущий файл в твиттере и метод печати.

private Consumer<FluxSink<String>> twitterUsernameGenerator() {
    return (sink) -> Stream.of("a", "b", "c", "d").forEach(sink::next);
}

private Flux<TwitterMessage>findTwitterMessagesByUsername() {
    return Flux.create(sink -> {
            sink.next(new TwitterMessage(...));
            sleep(2000);
            sink.next(new TwitterMessage(...));
        }
    });
}

private void print(Object o) {
    System.out.println("[" + Thread.currentThread().getName() + "] " + o);
}

Конечной целью является параллельный запуск поиска в Твиттере при применении противодавления к генератору, чтобы не выдавать больше имени пользователя, чем может быть обработано (ожидается некоторая предварительная выборка).

2.Генерация имен пользователей в Твиттере

Flux.create(twitterUsernameGenerator())
    .publishOn(Schedulers.single())
    .doOnNext(this::print)
    .subscribe();

Прекрасно создает 5 имен пользователей в Твиттере в одном отдельном потоке

[single-1] a
[single-1] b
[single-1] c
[single-1] d

3.Поиск твиттер-сообщений

Не уверен, что это правильно, но я считаю, что flatMap для создания множества твиттер-сообщений с одним именем пользователя и parallel для выполнения этой интенсивной операции ввода-вывода две темы.

Flux.create(twitterUsernameGenerator())
    .publishOn(Schedulers.single())
    .doOnNext(this::print)
    .parallel(2)
    .runOn(Schedulers.newParallel("p", 2))
    .flatMap(username -> findTwitterMessagesByUsername(username))
    .doOnNext(this::print)
    .subscribe();

Вах!Генератор генерирует имена пользователей быстрее, чем мы можем справиться.

[single-1] a
[single-1] b
[single-1] c
[single-1] d
[p-1] TwitterMessage{...}
[p-2] TwitterMessage{...}
...

3.Применение противодавления к генератору

Как я могу применить противодавление к функции генератора, чтобы результат стал ближе к этому

[single-1] a
[single-1] b
[p-1] TwitterMessage{...}
[single-1] c
[p-2] TwitterMessage{...}
[single-1] d
...

1 Ответ

0 голосов
/ 10 июля 2019

Противодавление происходит в «партиях», превышающих 4 элемента.Если вы модифицируете генератор для создания большего количества имен пользователей, например,

  private Consumer<FluxSink<String>> twitterUsernameGenerator() {
        return (sink) -> IntStream.rangeClosed(0, 100000)
                .boxed().map(String::valueOf)
                .collect(Collectors.toList())
                .forEach(sink::next);
    }

, результирующий поток будет вести себя примерно так же, как и ожидалось.

Вы можете поиграть с добавлением onBackpressureError() к исходному потоку:

Flux.create(twitterUsernameGenerator())
        .onBackpressureError()
        .publishOn(Schedulers.single())
        .....

, который четко сигнализирует о возникновении противодавления, создавая исключение.

...