Я пытаюсь использовать Flux
в качестве системы очередей. Я хочу иметь возможность записать данные sh на Flux
и подписаться на них позже. Большую часть времени я буду использовать элемент pu sh 1 и сразу же подпишусь после него, но в некоторых случаях мне нужно подождать 2/3 элементов и поработать с ними перед подпиской. Мне удалось заставить его работать с UnicastProcessor
и Flux
примерно так:
private final FluxProcessor<Message> processor = UnicastProcessor.create();
private final Flux<Message> messages = processor.publish().autoconnect();
public void add(Message message){
processor.onNext(message);
}
public void flush(){
messages.flatMap(this::doSomeWork)
.doOnNext(System.out::println)
.subscribe();
}
public void flush(int buffer){
messages.take(buffer)
.flatMap(this::doSomeWork)
.subscribe();
}
Этот код работает, но он будет накапливать подписки, поэтому он будет печатать одно и то же сообщение x раз для каждого раз я подписался. Я считаю, что это предполагаемое поведение подписки на Flux
, но это не то, чего я пытаюсь достичь, и я не могу найти, как это сделать.
Пример того, что я Попытка добиться заключается в следующем: (каждое число представляет новое сообщение, передаваемое в процессор)
--------1-----(*subscribe here*)----2---3---(*subscribe here*)-----4----(*subscribe here*)
И вывод выглядит так:
1
2
3
4
Вместо того, что я Получение:
1
2
3
2
3
4
4
4
Какие-нибудь советы, как мне этого добиться?