Я пытаюсь создать пример проекта с использованием Project Reactor и сценария, подобного следующему:
- Издатель, основанный на горячем источнике, который генерирует элемент в секунду.
- Два подписчика (в отдельных потоках): один способен поддерживать темп наблюдаемой активности, а другой настолько медленный, что элементы необходимо буферизовать.
- Выделенный и настраиваемый размерный буфер для каждого подписчика.Когда буфер заполнен, элементы удаляются.
В Java 9 это возможно с использованием класса SubmissionPublisher
:
"Буферизацияпозволяет производителям и потребителям временно работать с разными скоростями. Каждый подписчик использует независимый буфер. Буферы создаются при первом использовании и расширяются по мере необходимости до заданного максимума. "
Я основываю свой примерв этом примере кода, в котором я пытаюсь применить условия, описанные выше:
import java.time.Duration;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class FluxTest {
public static void main(String[] args) {
final ConnectableFlux<Integer> publisher = Flux.range(1, 20)
.delayElements(Duration.ofSeconds(1))
.replay(8);
publisher.publishOn(Schedulers.newSingle("fast"))
.subscribe(i -> {
System.out.println("Fast subscriber - Received " + i);
sleep(1);
});
publisher.publishOn(Schedulers.newSingle("slow"))
.subscribe(i -> {
System.out.println("Slow subscriber - Received " + i);
sleep(5);
});
publisher.connect();
}
private static void sleep(int seconds) {
try {
Thread.sleep(seconds * 1000L);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Это не достигает цели, поскольку элементы все равно буферизуются.Размер буфера применяется только до подписки следующего подписчика.
Как я могу получить независимую буферизацию и отбрасывание для каждого подписчика, используя Flux Project Reactor?
Примечание 1. Я задал связанный вопрос здесь , но,в этом случае вопрос касается только метода.
Примечание 2: Если вам интересно, я пытаюсь кодировать этот пример в Project Reactor, но я изо всех силс этим.