Подключение Flux / Publisher / Subscriber к постоянному генератору - PullRequest
0 голосов
/ 23 января 2019

У меня есть класс, который постоянно генерирует и сохраняет новые значения данных (используя пул потоков).Я хочу предоставить средство для клиентского кода («подписчики»), чтобы подключиться к этой последовательности новых значений данных (подключиться к ней).Однако, если у моего класса нет клиентов или все клиенты закончили чтение из последовательности, я хочу, чтобы он продолжал генерировать и хранить новые значения без остановки.Когда клиент подключается к этой последовательности, он получает вновь созданные значения, но не значения, созданные в прошлом.Какой класс Project Reactor (или классы) подходят для этого?

Я думаю, мне нужно представить последовательность новых значений, используя Flux, но какой класс Flux (или метод фабрики) использовать

1 Ответ

0 голосов
/ 23 января 2019

Использовать DirectProcessor

Как я понял, нужна возможность подписки на апстрим вне зависимости от того, есть подписчики или нет.

Это может быть достигнуто с поддержкой DirectProcessor. Поскольку Processor является комбинацией Publisher и Subscriber, он может «работать» в восходящем направлении и непрерывно прослушивать входящие сигналы. В то же время, DirectProcessor включает демультиплексирование сообщений или просто широковещательную рассылку сообщений всем доступным абонентам нисходящего направления (если они слушают).

Например, давайте рассмотрим следующий пример кода:

Flux<Long> intervalFlux = Flux.interval(Duration.ofMillis(500)).log("upstream");
DirectProcessor processor = DirectProcessor.create();

intervalFlux.subscribe(processor);

Thread.sleep(2000);

Disposable downstream1 = processor.log("downstream1")
                                  .subscribe();

Thread.sleep(1000);

downstream1.dispose();

Thread.sleep(1000);

Disposable downstream2 = processor.log("downstream2")
                                  .subscribe();
Thread.sleep(2000);

Как мы видим, мы подписались на апстрим с использованием процессора, поэтому интервал Flux начинает генерировать данные. Затем мы подписались на processor и ждали 1 секунду, поэтому два события должны наблюдаться downstream1 и 6, которые регистрируются в целом оператором log("upstream"). После этого мы отменили подписку, поэтому подписчик downstream1 должен прекратить наблюдать за любыми событиями, но log("upstream") должен по-прежнему соблюдать интервалы. Затем, после очередной паузы, мы подписались на поток с другим downstrea2 подписчиком, который должен наблюдать еще четыре события.

Общий вывод вышеуказанного кода следующий:

2019-01-23 15:09:04,246 INFO upstream [main] onSubscribe(FluxInterval.IntervalRunnable)
2019-01-23 15:09:04,249 INFO upstream [main] request(unbounded)
2019-01-23 15:09:04,757 INFO upstream [parallel-1] onNext(0)
2019-01-23 15:09:05,252 INFO upstream [parallel-1] onNext(1)
2019-01-23 15:09:05,751 INFO upstream [parallel-1] onNext(2)
2019-01-23 15:09:06,252 INFO upstream [parallel-1] onNext(3)
2019-01-23 15:09:06,258 INFO downstream1 [main] onSubscribe(DirectProcessor.DirectInner)
2019-01-23 15:09:06,258 INFO downstream1 [main] request(unbounded)
2019-01-23 15:09:06,754 INFO upstream [parallel-1] onNext(4)
2019-01-23 15:09:06,755 INFO downstream1 [parallel-1] onNext(4)
2019-01-23 15:09:07,254 INFO upstream [parallel-1] onNext(5)
2019-01-23 15:09:07,254 INFO downstream1 [parallel-1] onNext(5)
2019-01-23 15:09:07,263 INFO downstream1 [main] cancel()
2019-01-23 15:09:07,755 INFO upstream [parallel-1] onNext(6)
2019-01-23 15:09:08,255 INFO upstream [parallel-1] onNext(7)
2019-01-23 15:09:08,265 INFO downstream2 [main] onSubscribe(DirectProcessor.DirectInner)
2019-01-23 15:09:08,265 INFO downstream2 [main] request(unbounded)
2019-01-23 15:09:08,755 INFO upstream [parallel-1] onNext(8)
2019-01-23 15:09:08,756 INFO downstream2 [parallel-1] onNext(8)
2019-01-23 15:09:09,255 INFO upstream [parallel-1] onNext(9)
2019-01-23 15:09:09,256 INFO downstream2 [parallel-1] onNext(9)
2019-01-23 15:09:09,751 INFO upstream [parallel-1] onNext(10)
2019-01-23 15:09:09,751 INFO downstream2 [parallel-1] onNext(10)
2019-01-23 15:09:10,255 INFO upstream [parallel-1] onNext(11)
2019-01-23 15:09:10,255 INFO downstream2 [parallel-1] onNext(11)

Как мы видим, DirectProcessor обеспечивает требуемое поведение, поэтому он, вероятно, хорошо вписывается в него.

Примечание

DirectProcessor не поддерживает противодавление, поэтому в случае важности противодавления можно использовать оператор limitRate * оператор 1031 *.

См. Также

https://projectreactor.io/docs/core/release/reference/#_direct_processor

...