В настоящее время я использую rx-java 2, и у меня есть сценарий использования, когда один подписчик Camel Route должен использовать несколько наблюдаемых.
Используя это решение в качестве ссылки, у меня есть частично рабочее решение. RxJava - Объединенная наблюдаемая, которая принимает больше наблюдаемых в любое время?
Я планирую использовать PublishProcessor<T>
, который будет подписан на одного подписчика реактивного потока верблюдов, а затем поддерживать ConcurrentHashSet<Flowable<T>>
, где я могу динамически добавлять новые Наблюдаемые.
В настоящее время я застрял на том, как я могу добавить / управлять Flowable<T>
экземплярами с PublishProcessor?
Я действительно новичок в RX Java, поэтому любая помощь приветствуется! Это то, что я до сих пор:
PublishProcessor<T> publishProcessor = PublishProcessor.create();
CamelReactiveStreamsService camelReactiveStreamsService =
CamelReactiveStreams.get(camelContext);
Subscriber<T> subscriber =
camelReactiveStreamsService.streamSubscriber("t-class",T.class);
}
Set<Flowable<T>> flowableSet = Collections.newSetFromMap(new ConcurrentHashMap<Flowable<T>, Boolean>());
public void add(Flowable<T> flowableOrder){
flowableSet.add(flowableOrder);
}
public void subscribe(){
publishProcessor.flatMap(x -> flowableSet.forEach(// TODO)
}) .subscribe(subscriber);
}