Как отменить подписку на элемент из потока - PullRequest
0 голосов
/ 07 декабря 2018

Рассмотрим следующий поток

FluxSink<String> sink;
Flux<String> flux1 = Flux
    .<String>create(emitter -> { 
         sink = emitter;
    },...)
    .cache()
    .publish()
    .autoConnect();

Таким образом, чтобы добавить / подписать элемент, мы можем сделать sink.next(“4”);

flux1.subscribe(item -> log.info(“item: “+item);

Отфильтровав flux1, скажем, из элемента «2»Не удаляли этот элемент из потока. Я знаю, что издатель Flux является неизменным.

Если мы можем добавить к нему через слив, как мы можем удалить элемент из flux1?

1 Ответ

0 голосов
/ 08 декабря 2018

Правильное мышление дает правильные ответы

Думайте о Флюсе как о неизменном потоке сообщений.Это как река, вы можете добавить к ней немного воды, но вы не можете откатить воду, которую вы уже дали потоку.Однако вы можете отфильтровать эту воду в потоке.

В случае, если вам необходимо «удалить» недопустимые элементы из потока, вы можете отфильтровать их:

flux1.filter(e -> !e.equal("2"))
     .subscribe(item -> log.info(“item: “+item);

Поток не являетсяструктура данных, к которой мы привыкли, но это поток данных, который вы не можете изменить в точке подачи данных, но можете манипулировать тем, как они идут к конечной точке

...