Подключение наблюдаемого к SingleInterop в Vert.x - PullRequest
0 голосов
/ 16 октября 2019

У меня проблемы с выяснением, как правильно подключить вызов OnSext () Observables с помощью SingleInterop. Я также не уверен, как вернуть ожидаемый результат, не блокируя основной поток. Моя грубая попытка показана ниже. Любая помощь будет оценена.

public class SomeClass {
  private ExecutorService executor = Executors.newSingleThreadExecutor;
  private ObservableOnSubscribe<MyCustomObj> disHandler;

  public SomeClass() {
    init();
  }

  /** Create an observable to listen to a data input stream **/
  private void init() {
    disHandler = emitter ->
      executor.submit(() -> {
        try {
          while (true) { 
            MyCustomObj mco = readFromDataInputStream();
            emitter.onNext(mco);
          }
          // Should never complete since always listening...
          emitter.onComplete();
        }
        catch(Exception e) {
          emitter.onError(e);
        }
      });
  }
  public Single<String> invokeSomethingAndGetResponseFromHandler(Object someObj) {
    // Eventually the disHandler will send an onNext() as 
    // a result of doSomethingToBackend() call.

    Observable<String> observer = Observable.fromCallable(doSomethingToBackend(someObj));
    observer.subscribe(item -> item);

    return ???.to(SingleInterop.get());
  }
}

public class GraphQLVertx {

  public VertxDataFetcher<Single<String>> dataFetcherTest() {
    return new VertxDataFetcher<>((env, future) -> {
      try {
        future.complete(someClass.invokeSomethingAndGetResponseFromHandler("Blah");
      }
      catch(Exception e) {
        future.fail();
      }
    });
  }
}

1 Ответ

1 голос
/ 17 октября 2019

У наблюдаемой есть вспомогательный метод, который возвращает первый испущенный элемент как одиночный, или как ошибка, поэтому, если у вас уже есть Observable<String>, вы можете преобразовать его в Single<String> следующим образом:

public Single<String> invokeSomethingAndGetResponseFromHandler(Object someObj) {
    return Observable.fromCallable(doSomethingToBackend(someObj))
        .singleOrError();
  }
...