Reactor EmitterProcessor, который сохраняет только последние n элементов? - PullRequest
0 голосов
/ 28 января 2019

Как мне создать EmitterProcessor, который сохраняет только последние n элементы, так что он также работает, даже если нет подписчиков?

В данный момент я создаю процессоркак это:

EmitterProcessor<Integer> processor = EmitterProcessor.create();

И внешняя система обеспечивает случайные обновления температуры в течение дня.В обратном вызове из этой системы я делаю:

void tempConsumer(int temp) {
    processor.onNext(temp);
}

Однако onNext(...) блокируется после добавления processor.getBufferSize() элементов.

Как создать процессор, который отбрасывает самый старый элемент,в данном случае, а не блокирование?

Это, кажется, в некоторой степени охватывается в активной зоне реактора # 763 .Саймон Басле сначала обсуждает предлагаемое изменение на EmitterProcessor так, чтобы при "отправке данных, когда НЕТ подписчиков [и] очередь содержала bufferSize элементов, самый старый элемент удалялся, а onNextпоставлен в очередь «.Но затем в следующем комментарии он говорит: «Мы не будем продолжать с предложенным выше изменением. Вместо этого мы советуем вам использовать sink() вместо прямого onNext. А именно, использовать обратный вызов onRequest внутриsink() для выполнения столько же sink.next(...), сколько есть запросов. "

Однако, если я правильно понимаю, это касается только случая, когда вы можете вычислять новые элементы по запросу, например, так:

FluxSink<Integer> sink = processor.sink();
Random random = new Random();

sink.onRequest(n -> random.nextInt()); // Generate next n requested elements.

Но в моей ситуации я не могу генерировать последние n показаний температуры по запросу.Конечно, я мог бы поддерживать свой собственный внешний ограниченный буфер последних чтений, а затем читать из него в onRequest(...), но я предполагаю, что Reactor может сделать это для меня?

Я предполагаю, что этот вопрос является дублирующим -но мой Google foo подвел меня здесь.


Ответ Рикарда Коллкаку о том, что нужно использовать ReplayProcessor, кажется правильным решениемВот еще один пример, который я написал, чтобы понять, как его использовать:

ReplayProcessor<Integer> flux = ReplayProcessor.create(Queues.SMALL_BUFFER_SIZE);
FluxSink<Integer> sink = flux.sink();

// ReplayProcessor.getBufferSize() returns unbounded,
// while CAPACITY returns the capacity of the underlying buffer.
int capacity = flux.scan(Scannable.Attr.CAPACITY);

// Add twice as many elements as the underlying buffer can take.
int count = capacity * 2;

for (int i = 0; i < count; i++) {
    sink.next(i);
}

// If `capacity` is 256, this will print value 256 thru to 511.
flux.subscribe(System.out::println);

Я также нашел этот раздел , в Практическое реактивное программирование с Reactor, полезно для объяснения вещей.

1 Ответ

0 голосов
/ 28 января 2019

Вы должны использовать ReplayProcessor, как в этом примере:

 ReplayProcessor<Integer> directProcessor = ReplayProcessor.cacheLast();

    Flux.range(1, 10)
            .map(integer -> {
                directProcessor.onNext(integer);
                return integer;
            }).doOnComplete(() -> {
        directProcessor.subscribe(System.out::println);
        directProcessor.subscribe(System.out::println);
    })
            .subscribe();
...