Противодавление распространяется от конечного амортизатора вверх по всей цепи.Но операторы в цепочке могут запрашивать данные заранее (предварительную выборку) или даже «переписывать» запрос.Например, в случае buffer(3)
, если этот оператор получает request(1)
, он будет выполнять request(3)
в восходящем направлении ("1 буфер == макс. 3 элемента, поэтому я могу запросить свой источник достаточно, чтобы заполнить 1 буфер, который мне был запрошен").").
Если входные данные всегда предоставляются пользователем, их будет трудно абстрагировать ...
Нет простого способа оценить ограничение источников для нескольких конвейеров или даже несколькихподписки на данный конвейер (a Flux
).
Использование общего Scheduler
в нескольких publishOn
не будет работать, поскольку publishOn
выбирает поток Worker
и придерживается его.
Однако, если ваш вопрос более конкретно касается ограничения задачи base64
, возможно, эффект можно получить из параметра одновременности flatMap
?
input.flatMap(someString -> asyncProcess(someString), 3, 1);
Это позволит максимально 3вхождения asyncProcess
запускаются, и каждый раз, когда один завершается, он начинает новое со следующего значения из input
.