Динамичный набор издателей, все излучающие через один поток - PullRequest
1 голос
/ 17 октября 2019

Я пытаюсь создать своего рода сервис-концентратор, который может излучать через горячий поток (выход), но вы также можете зарегистрировать / отменить регистрацию производителей / издателей потока (вход)

Я знаю, что могу сделать что-то вроде:

    class Hub<T> {
        /**
         * @return unregister function
         */
        Function<Void, Void> registerProducer(final Flux<T> flux) { ... }

        Disposable subscribe(Consumer<? super T> consumer) {
            if (out == null) { 
                // obviously this will not work!
                out = Flux.merge(producer1, producer2, ...).share();
            }
            return out;
        }
    }

... но как эти "производители" зарегистрированы и незарегистрированы, как мне добавить новый источник потока в существующий подписанный поток? или удалить из него незарегистрированный источник?

TIA!

1 Ответ

1 голос
/ 18 октября 2019

Flux является неизменным по своей структуре, поэтому, как вы и предполагали в этом вопросе, просто «обновить» существующий Flux на месте невозможно.

Обычно я бы рекомендовал избегать использованияProcessor напрямую. Однако это один из (редких) случаев, когда Processor является, пожалуй, единственным разумным вариантом, поскольку вы, по сути, хотите публиковать элементы динамически на основе зарегистрированных производителей. Нечто похожее на:

class Hub<T> {

    private final FluxProcessor<T, T> processor;
    private final FluxSink<T> sink;

    public Hub() {
        this.processor = DirectProcessor.<T>create().serialize();
        this.sink = processor.sink();
    }

    public Disposable registerProducer(Flux<T> flux) {
        return flux.subscribe(sink::next);
    }

    public Flux<T> read() {
        return processor;
    }
}

Если вы хотите удалить продюсера, вы можете отслеживать Disposable, возвращенный из registerProducer(), и вызывать dispose() для него, когда закончите.

...