Как вернуть CompletionStage из PublishSubject - PullRequest
0 голосов
/ 29 октября 2019

В Vert.x у меня есть тема публикации, на которую я хотел бы подписаться несколькими наблюдателями. Vert.x ожидает CompletionStage в результате. Как отправить значение от наблюдателя без блокировки? Вот простой пример того, что я пытаюсь сделать.

Пример:

class TestClass {
  private PublishSubject ps = PublishSubject.create();

  public TestClass() {
    init();
  }

  public Observable<Foo> init() {
    Observable<Foo> foo = getDataFromInputStream();
    foo.subscribe(new Subscriber<Foo> () {
      public void onNext(Foo foo) {
        // Basically copy all values to a PublishSubject
        // since we are dealing with a Java Input Stream
        ps.onNext(foo);
      }
      public void onError(Throwable t) {
        ps.onError(t)
      }

      // onComplete is never called since the stream lives on 'forever'
      public void onComplete() {
      }
    });
  }

  public DataFetcher<CompletionStage<Foo>> vertxCallOne() {
    return env -> doWork().to(SingleInterop.get());
  }

  public DataFetcher<CompletionStage<Bar>> vertxCallTwo() {
    return env -> doOtherWork().to(SingleInterop.get());
  }

  public Observable<Foo> doWork() {
    ps.filter(blahBlahBlah)
    .subscribe(new Subscriber<Foo> () {
        public void onNext(Foo foo) {
        }

        public void onError(Throwable t) {
        }

        public void onComplete() {
        }
    });
    return ???
  }

  public Observable<Bar> doOtherWork() {
    // Reuse the same PublishSubject as doWork(), but with 
    // different filter/mapping rules.
    ps.filter(blah)
    .filter(blah2)
    .map()
    .subscribe(new Subscriber<Bar> () {
        public void onNext(Bar bar) {
        }

        public void onError(Throwable t) {
        }

        public void onComplete() {
        }
    });
    return ???
  }

Любая помощь будет принята с благодарностью. Спасибо!

...