RxJava -2 Observables, который принимает больше Observables в любое время? - PullRequest
0 голосов
/ 02 мая 2018

В настоящее время я использую rx-java 2, и у меня есть сценарий использования, когда один подписчик Camel Route должен использовать несколько наблюдаемых. Используя это решение в качестве ссылки, у меня есть частично рабочее решение. RxJava - Объединенная наблюдаемая, которая принимает больше наблюдаемых в любое время?

Я планирую использовать PublishProcessor<T>, который будет подписан на одного подписчика реактивного потока верблюдов, а затем поддерживать ConcurrentHashSet<Flowable<T>>, где я могу динамически добавлять новые Наблюдаемые.
В настоящее время я застрял на том, как я могу добавить / управлять Flowable<T> экземплярами с PublishProcessor? Я действительно новичок в RX Java, поэтому любая помощь приветствуется! Это то, что я до сих пор:

PublishProcessor<T> publishProcessor = PublishProcessor.create();
CamelReactiveStreamsService camelReactiveStreamsService = 
CamelReactiveStreams.get(camelContext);
Subscriber<T> subscriber = 
     camelReactiveStreamsService.streamSubscriber("t-class",T.class);
}
Set<Flowable<T>> flowableSet = Collections.newSetFromMap(new ConcurrentHashMap<Flowable<T>, Boolean>());

public void add(Flowable<T> flowableOrder){
    flowableSet.add(flowableOrder);
}

public void subscribe(){
    publishProcessor.flatMap(x -> flowableSet.forEach(// TODO)
    }) .subscribe(subscriber);
}

1 Ответ

0 голосов
/ 02 мая 2018

Вы можете иметь один Processor и подписаться на несколько наблюдаемых потоков. Вам нужно будет управлять подписками, добавляя и удаляя их по мере добавления и удаления наблюдаемых.

PublishProcessor<T> publishProcessor = PublishProcessor.create();

Map<Flowable<T>, Disposable> subscriptions = new ConcurrentHashMap<>();

void addObservable( Flowable<T> flowable ) {
  subscriptions.computeIfAbsent( flowable, fkey -> 
    flowable.subscribe( publishProcessor ) );
}
void removeObservable( Flowable<T> flowable ) {
  Disposable d = subscriptions.remove( flowable );
  if ( d != null ) {
    d.dispose();
  }
}
void close() {
  for ( Disposable d: subscriptions.values() ) {
    d.dispose();
  }
}

Используйте текучую среду как ключ к карте и добавляйте или удаляйте подписки.

...