Как мне создать EmitterProcessor
, который сохраняет только последние n элементы, так что он также работает, даже если нет подписчиков?
В данный момент я создаю процессоркак это:
EmitterProcessor<Integer> processor = EmitterProcessor.create();
И внешняя система обеспечивает случайные обновления температуры в течение дня.В обратном вызове из этой системы я делаю:
void tempConsumer(int temp) {
processor.onNext(temp);
}
Однако onNext(...)
блокируется после добавления processor.getBufferSize()
элементов.
Как создать процессор, который отбрасывает самый старый элемент,в данном случае, а не блокирование?
Это, кажется, в некоторой степени охватывается в активной зоне реактора # 763 .Саймон Басле сначала обсуждает предлагаемое изменение на EmitterProcessor
так, чтобы при "отправке данных, когда НЕТ подписчиков [и] очередь содержала bufferSize
элементов, самый старый элемент удалялся, а onNext
поставлен в очередь «.Но затем в следующем комментарии он говорит: «Мы не будем продолжать с предложенным выше изменением. Вместо этого мы советуем вам использовать sink()
вместо прямого onNext
. А именно, использовать обратный вызов onRequest
внутриsink()
для выполнения столько же sink.next(...)
, сколько есть запросов. "
Однако, если я правильно понимаю, это касается только случая, когда вы можете вычислять новые элементы по запросу, например, так:
FluxSink<Integer> sink = processor.sink();
Random random = new Random();
sink.onRequest(n -> random.nextInt()); // Generate next n requested elements.
Но в моей ситуации я не могу генерировать последние n показаний температуры по запросу.Конечно, я мог бы поддерживать свой собственный внешний ограниченный буфер последних чтений, а затем читать из него в onRequest(...)
, но я предполагаю, что Reactor может сделать это для меня?
Я предполагаю, что этот вопрос является дублирующим -но мой Google foo подвел меня здесь.
Ответ Рикарда Коллкаку о том, что нужно использовать ReplayProcessor
, кажется правильным решениемВот еще один пример, который я написал, чтобы понять, как его использовать:
ReplayProcessor<Integer> flux = ReplayProcessor.create(Queues.SMALL_BUFFER_SIZE);
FluxSink<Integer> sink = flux.sink();
// ReplayProcessor.getBufferSize() returns unbounded,
// while CAPACITY returns the capacity of the underlying buffer.
int capacity = flux.scan(Scannable.Attr.CAPACITY);
// Add twice as many elements as the underlying buffer can take.
int count = capacity * 2;
for (int i = 0; i < count; i++) {
sink.next(i);
}
// If `capacity` is 256, this will print value 256 thru to 511.
flux.subscribe(System.out::println);
Я также нашел этот раздел , в Практическое реактивное программирование с Reactor, полезно для объяснения вещей.