У меня есть следующий тестовый код:
FlowableOnSubscribe<SomeObj> fos;
private void init() {
fos = emitter -> {
try {
while(true) {
SomeObj someObj = readFromDataInputStream();
emitter.onNext(someObj);
System.out.println("Emitted object");
}
}
catch(Exception e) {
emitter.onError(e);
}
};
}
public Single<String> doWork() {
Flowable<String> myFlow = Flowable.defer(() ->
Flowable.create(fos, BackpressureStrategy.BUFFER)
.cache()
.subscribeOn(Schedulers.io(), false)
.doOnSubscribe(x -> doSomethingToTriggerDataInputStream())
.map(x -> convertMyCustomObjectToString(x))
);
);
// lastOrError/singleOrError end up blocking :(
return myFlow.firstOrError();
}
public VertxDataFetcher<CompletionStage<String>> vertxDataFetcherTest() {
return new VertxDataFetcher<>((env, future) -> {
try {
future.complete(doWork().to(SingleInterop.get()));
}
catch(Exception e) {
future.fail();
}
});
}
public DataFetcher<CompletionStage<String>> dataFetcherTest() {
return env -> doWork().to(SingleInterop.get());
}
Если я запускаю пример кода, он зависает после первого успешного использования. Другими словами, он будет запускаться один раз при начальной загрузке веб-страницы, но если я обновлю браузер (Ctrl F5), он зависнет и больше не завершит вызов.
FWIW, с небольшой отладкой похоже, что вызовы defer / map происходят в RxCachedThreadScheduler1, а после обновления веб-страницы он зависает в вызове defer с RxCachedThreadScheduler2.
Если я переключаюсьиспользование отдельного входного потока для каждого вызова не приводит к зависанию. Вот пример, который я использовал ( RxJava: подача одного потока (наблюдаемого) в качестве ввода другого потока …).
Однако это не работает с моим дизайном, так как для него требуется 1 общее соединение, которое остается открытым все время. Это связано с тем, что я хочу, чтобы подписка GraphQL могла захватывать все данные, поступающие в поток ввода данных. Если у меня есть несколько / отдельные сокетные соединения, входной поток подписки GraphQL пропустит все, что отправлено во входные потоки без подписки. (Если не лучший вариант, чтобы любой другой входной поток также отправлялся в поток подписки GraphQL, что я не уверен, как это сделать ...)
В качестве идентификатора, имеет ли значение, если я использую VertxDataFetcherпротив DataFetcher в этом сценарии? В настоящее время я использую DataFetcher для запуска моего примера. Если мне нужно переключиться на VertxDataFetcher, я не уверен, как правильно приводить типы, чтобы этот метод работал.