Как использовать выделенный буфер для каждого подписчика в Project Reactor? - PullRequest
0 голосов
/ 26 апреля 2018

Я пытаюсь создать пример проекта с использованием Project Reactor и сценария, подобного следующему:

  • Издатель, основанный на горячем источнике, который генерирует элемент в секунду.
  • Два подписчика (в отдельных потоках): один способен поддерживать темп наблюдаемой активности, а другой настолько медленный, что элементы необходимо буферизовать.
  • Выделенный и настраиваемый размерный буфер для каждого подписчика.Когда буфер заполнен, элементы удаляются.

В Java 9 это возможно с использованием класса SubmissionPublisher:

"Буферизацияпозволяет производителям и потребителям временно работать с разными скоростями. Каждый подписчик использует независимый буфер. Буферы создаются при первом использовании и расширяются по мере необходимости до заданного максимума. "

Я основываю свой примерв этом примере кода, в котором я пытаюсь применить условия, описанные выше:

import java.time.Duration;

import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class FluxTest {

  public static void main(String[] args) {
    final ConnectableFlux<Integer> publisher = Flux.range(1, 20)
      .delayElements(Duration.ofSeconds(1))
      .replay(8);

    publisher.publishOn(Schedulers.newSingle("fast"))
      .subscribe(i -> {
        System.out.println("Fast subscriber - Received " + i);
        sleep(1);
      });

    publisher.publishOn(Schedulers.newSingle("slow"))
      .subscribe(i -> {
        System.out.println("Slow subscriber - Received " + i);
        sleep(5);
      });

    publisher.connect();
  }

  private static void sleep(int seconds) {
    try {
      Thread.sleep(seconds * 1000L);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

Это не достигает цели, поскольку элементы все равно буферизуются.Размер буфера применяется только до подписки следующего подписчика.

Как я могу получить независимую буферизацию и отбрасывание для каждого подписчика, используя Flux Project Reactor?

Примечание 1. Я задал связанный вопрос здесь , но,в этом случае вопрос касается только метода.

Примечание 2: Если вам интересно, я пытаюсь кодировать этот пример в Project Reactor, но я изо всех силс этим.

...