Java Поток по сравнению с наблюдаемым / поведенческим субъектом - PullRequest
2 голосов
/ 19 июня 2020

Мой вопрос в том, имеет ли Flux возможность вести себя как Observable или BehaviorSubject. Я думаю, что понимаю суть того, что делает Flux и как, но каждый учебник, который я вижу, создает поток содержимого stati c, то есть некоторый уже существующий массив чисел, которые конечны по своей природе.

Однако , Я хочу, чтобы мой поток был потоком неизвестных значений с течением времени ... например, Observable или BehaviorSubject. С их помощью вы можете создать такой метод, как setNextValue (String value), и передать эти значения всем подписчикам Observable / BehaviorSubject et c.

Возможно ли это с помощью Flux? Или поток значений должен сначала состоять из потока значений наблюдаемого типа?

Обновление

Я ответил на свой вопрос реализацией ниже. Принятый ответ, вероятно, приведет к тому же пути, но немного сложнее.

Ответы [ 2 ]

0 голосов
/ 20 июня 2020

Это можно сделать так:

private EmitterProcessor<String> processor;
private FluxSink<String> statusSink;
private Flux<String> status;

public constructor() {
    this.processor = EmitterProcessor.create();
    this.statusSink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
    this.status = this.processor.publish().autoConnect();
}

public Flux<String> getStatus() {
    return this.status;
}

public void setStatus(String status) {
    this.statusSink.next(status);
}
0 голосов
/ 19 июня 2020

каждый учебник, который я вижу, создает поток статического c содержимого, то есть некоторый уже существующий массив чисел, которые конечны по своей природе.

Вы увидите это, потому что большинство учебные пособия сосредоточены на том, как манипулировать и использовать Flux, но смысл здесь (что вы можете просто использовать Flux с stati c, содержимым фиксированной длины) и неудачен, и неверен. Он намного мощнее этого, и использование его с таким c контентом почти наверняка не , как вы его видите в реальном мире.

По сути, есть 3 разных способа создание Flux для динамического испускания элементов, как вы описываете:

Однако я хочу, чтобы мой Flux был потоком неизвестных значений с течением времени ... например, Observable или BehaviorSubject. С их помощью вы можете создать такой метод, как setNextValue (String value), и передать эти значения всем подписчикам Observable / BehaviorSubject et c.

Абсолютно точно - взгляните на Flux.push(). Это открывает эмиттер, и emitter.next(value) может вызываться всякий раз, когда вы используете sh. Этот поток может go включаться столько, сколько вы хотите (бесконечно, если хотите.) Flux.create() по сути является многопоточным вариантом Flux.push(), который также может быть полезен.

Flux.generate() тоже стоит взглянуть - это немного похоже на версию Flux.push() «по запросу», где вы отправляете следующий элемент только через обратный вызов, когда нижестоящий потребитель запрашивает его, а не отправляет, когда вы хотите. Это не всегда практично, но имеет смысл использовать этот метод, если вариант использования делает это возможным, поскольку он учитывает противодавление и, таким образом, можно гарантировать, что он не перегружает потребителя большим количеством запросов, чем он может обработать.

...