Флюс отправка дубликатов благодаря множеству подписок - PullRequest
0 голосов
/ 08 января 2020

Я пытаюсь использовать Flux в качестве системы очередей. Я хочу иметь возможность записать данные sh на Flux и подписаться на них позже. Большую часть времени я буду использовать элемент pu sh 1 и сразу же подпишусь после него, но в некоторых случаях мне нужно подождать 2/3 элементов и поработать с ними перед подпиской. Мне удалось заставить его работать с UnicastProcessor и Flux примерно так:

private final FluxProcessor<Message> processor = UnicastProcessor.create();
private final Flux<Message> messages = processor.publish().autoconnect();

public void add(Message message){
    processor.onNext(message);
}

public void flush(){
   messages.flatMap(this::doSomeWork)
           .doOnNext(System.out::println)
           .subscribe();
}

public void flush(int buffer){
    messages.take(buffer)
            .flatMap(this::doSomeWork)
            .subscribe();
}

Этот код работает, но он будет накапливать подписки, поэтому он будет печатать одно и то же сообщение x раз для каждого раз я подписался. Я считаю, что это предполагаемое поведение подписки на Flux, но это не то, чего я пытаюсь достичь, и я не могу найти, как это сделать.

Пример того, что я Попытка добиться заключается в следующем: (каждое число представляет новое сообщение, передаваемое в процессор)

--------1-----(*subscribe here*)----2---3---(*subscribe here*)-----4----(*subscribe here*)

И вывод выглядит так:

1

2
3

4

Вместо того, что я Получение:

1

2
3
2
3

4
4
4

Какие-нибудь советы, как мне этого добиться?

1 Ответ

1 голос
/ 08 января 2020

Причина, по которой вы получаете такой результат, заключается в том, что вы не отменили подписку на предыдущие подписки, чтобы они оставались активными и обрабатывали ваши сообщения.

Чтобы отменить их, вы можете изменить свой метод flush() :

public Disposable flush(){
    return messages
            .doOnNext(System.out::println)
            .subscribe();
}

Теперь вы можете вызвать метод dispose (), который отменит вашу подписку, если она больше не нужна.

public void run() {
    add(1);
    flush().dispose();
    System.out.println();
    add(2);
    add(3);
    flush().dispose();
    System.out.println();
    add(4);
    add(5);
    add(6);
    flush().dispose();
    System.out.println();
    add(7);
    flush().dispose();
}

В результате:

1

2
3

4
5
6

7
...