У меня проблемы с выяснением, как правильно подключить вызов OnSext () Observables с помощью SingleInterop. Я также не уверен, как вернуть ожидаемый результат, не блокируя основной поток. Моя грубая попытка показана ниже. Любая помощь будет оценена.
public class SomeClass {
private ExecutorService executor = Executors.newSingleThreadExecutor;
private ObservableOnSubscribe<MyCustomObj> disHandler;
public SomeClass() {
init();
}
/** Create an observable to listen to a data input stream **/
private void init() {
disHandler = emitter ->
executor.submit(() -> {
try {
while (true) {
MyCustomObj mco = readFromDataInputStream();
emitter.onNext(mco);
}
// Should never complete since always listening...
emitter.onComplete();
}
catch(Exception e) {
emitter.onError(e);
}
});
}
public Single<String> invokeSomethingAndGetResponseFromHandler(Object someObj) {
// Eventually the disHandler will send an onNext() as
// a result of doSomethingToBackend() call.
Observable<String> observer = Observable.fromCallable(doSomethingToBackend(someObj));
observer.subscribe(item -> item);
return ???.to(SingleInterop.get());
}
}
public class GraphQLVertx {
public VertxDataFetcher<Single<String>> dataFetcherTest() {
return new VertxDataFetcher<>((env, future) -> {
try {
future.complete(someClass.invokeSomethingAndGetResponseFromHandler("Blah");
}
catch(Exception e) {
future.fail();
}
});
}
}