Как реализовать буфер, размер которого зависит от скорости потребителя и производителя с RxJava? - PullRequest
0 голосов
/ 24 мая 2018

Требования следующие:

  1. Когда производитель работает быстрее, чем потребитель, произведенные элементы должны буферизироваться до тех пор, пока буфер не будет заполнен.Когда буфер заполнен, от производителя больше не требуется производить новые элементы (обратное давление).
  2. Когда потребитель работает быстрее, чем производитель, буферизованные элементы испускаются для компенсации.Если буфер опустеет, произойдет голодание, но мы ничего не можем сделать в этом случае.
  3. Цель буфера - снизить риск голодания;следовательно, он должен оставаться заполненным до тех пор, пока производитель быстрее, чем потребитель.

Я пробовал разные текучие операторы.Тот, который очень близок к этим требованиям, - это объект с размером буфера.Но за этим оператором скрывается некоторая магия: когда буфер заполнен, он постепенно очищается до тех пор, пока его размер не станет ниже 25% от его общей емкости, и в ходе этого процесса ни один элемент не запрашивается у производителя.

Какойоператор или комбинацию операторов я буду использовать для выполнения этих требований?

Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 30 мая 2018

Я, наконец, пришел к выводу, что нестандартный текучий оператор может быть приемлемым решением как для управления переполнением (через противодавление), так и недостаточным (через буфер, который фактически ведет себя как резервуар элементов, чтобы избежать голодания).

Когда этот оператор подключен к протекающему трубопроводу, резервуар элементов заполняется элементами, выделяемыми производителем.Потребитель берет элементы, хранящиеся в резервуаре, по требованию.Производитель и потребитель работают с двумя различными планировщиками.

Ситуация переполнения:

  • Когда резервуар заполнен, у производителя больше не требуется испускать новые элементы(из-за противодавления в трубопроводе).

Ситуация недостаточного расхода:

  • Резервуар используется для питания потребителя элементами, по требованию.

Жизненный цикл

  • Буферная фаза: резервуар заполнен, пока не заполнится.На этом этапе потребитель не получает никаких элементов.
  • Фаза передачи: потребитель берет элементы, хранящиеся в резервуаре, когда это необходимо;производитель хранит элементы в резервуаре, пока он не заполнен.
  • Фаза промывки: производитель завершил;потребитель забирает все элементы, оставшиеся в резервуаре, и, наконец, получает сигнал о завершении.

Этот оператор неблокирует как в ситуациях переполнения, так и в ситуациях переполнения.

Использование:

ReservoirOperator op = new ReservoirOperator(bufferSize, Schedulers.io());
upstream.lift(op).subscribe(consumer);
0 голосов
/ 27 мая 2018

Используйте несколько rebatchRequests (который использует наблюдений под крышками), чтобы искусственно увеличить уровень, на котором делаются дальнейшие запросы.

...