Flux
является неизменным по своей структуре, поэтому, как вы и предполагали в этом вопросе, просто «обновить» существующий Flux
на месте невозможно.
Обычно я бы рекомендовал избегать использованияProcessor
напрямую. Однако это один из (редких) случаев, когда Processor
является, пожалуй, единственным разумным вариантом, поскольку вы, по сути, хотите публиковать элементы динамически на основе зарегистрированных производителей. Нечто похожее на:
class Hub<T> {
private final FluxProcessor<T, T> processor;
private final FluxSink<T> sink;
public Hub() {
this.processor = DirectProcessor.<T>create().serialize();
this.sink = processor.sink();
}
public Disposable registerProducer(Flux<T> flux) {
return flux.subscribe(sink::next);
}
public Flux<T> read() {
return processor;
}
}
Если вы хотите удалить продюсера, вы можете отслеживать Disposable
, возвращенный из registerProducer()
, и вызывать dispose()
для него, когда закончите.