Я, наконец, пришел к выводу, что нестандартный текучий оператор может быть приемлемым решением как для управления переполнением (через противодавление), так и недостаточным (через буфер, который фактически ведет себя как резервуар элементов, чтобы избежать голодания).
Когда этот оператор подключен к протекающему трубопроводу, резервуар элементов заполняется элементами, выделяемыми производителем.Потребитель берет элементы, хранящиеся в резервуаре, по требованию.Производитель и потребитель работают с двумя различными планировщиками.
Ситуация переполнения:
- Когда резервуар заполнен, у производителя больше не требуется испускать новые элементы(из-за противодавления в трубопроводе).
Ситуация недостаточного расхода:
- Резервуар используется для питания потребителя элементами, по требованию.
Жизненный цикл
- Буферная фаза: резервуар заполнен, пока не заполнится.На этом этапе потребитель не получает никаких элементов.
- Фаза передачи: потребитель берет элементы, хранящиеся в резервуаре, когда это необходимо;производитель хранит элементы в резервуаре, пока он не заполнен.
- Фаза промывки: производитель завершил;потребитель забирает все элементы, оставшиеся в резервуаре, и, наконец, получает сигнал о завершении.
Этот оператор неблокирует как в ситуациях переполнения, так и в ситуациях переполнения.
Использование:
ReservoirOperator op = new ReservoirOperator(bufferSize, Schedulers.io());
upstream.lift(op).subscribe(consumer);