Как правильно комбинировать RxJava / Vert.x Сборщики данных / Подписки GraphQL - PullRequest
0 голосов
/ 18 октября 2019

У меня есть следующий тестовый код:

  FlowableOnSubscribe<SomeObj> fos;

  private void init() {
    fos = emitter -> {
      try {
        while(true) {
          SomeObj someObj = readFromDataInputStream();
          emitter.onNext(someObj);
          System.out.println("Emitted object");
        }
      }
      catch(Exception e) {
        emitter.onError(e);
      }
    };
  }

  public Single<String> doWork() {
    Flowable<String> myFlow = Flowable.defer(() -> 
      Flowable.create(fos, BackpressureStrategy.BUFFER)
        .cache()
        .subscribeOn(Schedulers.io(), false)
        .doOnSubscribe(x -> doSomethingToTriggerDataInputStream())
        .map(x -> convertMyCustomObjectToString(x))
      );
    );

    // lastOrError/singleOrError end up blocking :(
    return myFlow.firstOrError(); 
  }

  public VertxDataFetcher<CompletionStage<String>> vertxDataFetcherTest() {
    return new VertxDataFetcher<>((env, future) -> {
      try {
        future.complete(doWork().to(SingleInterop.get()));
      }
      catch(Exception e) {
        future.fail();
      }
    });
  }

  public DataFetcher<CompletionStage<String>> dataFetcherTest() {
    return env -> doWork().to(SingleInterop.get());
  }

Если я запускаю пример кода, он зависает после первого успешного использования. Другими словами, он будет запускаться один раз при начальной загрузке веб-страницы, но если я обновлю браузер (Ctrl F5), он зависнет и больше не завершит вызов.

FWIW, с небольшой отладкой похоже, что вызовы defer / map происходят в RxCachedThreadScheduler1, а после обновления веб-страницы он зависает в вызове defer с RxCachedThreadScheduler2.

Если я переключаюсьиспользование отдельного входного потока для каждого вызова не приводит к зависанию. Вот пример, который я использовал ( RxJava: подача одного потока (наблюдаемого) в качестве ввода другого потока …).

Однако это не работает с моим дизайном, так как для него требуется 1 общее соединение, которое остается открытым все время. Это связано с тем, что я хочу, чтобы подписка GraphQL могла захватывать все данные, поступающие в поток ввода данных. Если у меня есть несколько / отдельные сокетные соединения, входной поток подписки GraphQL пропустит все, что отправлено во входные потоки без подписки. (Если не лучший вариант, чтобы любой другой входной поток также отправлялся в поток подписки GraphQL, что я не уверен, как это сделать ...)

В качестве идентификатора, имеет ли значение, если я использую VertxDataFetcherпротив DataFetcher в этом сценарии? В настоящее время я использую DataFetcher для запуска моего примера. Если мне нужно переключиться на VertxDataFetcher, я не уверен, как правильно приводить типы, чтобы этот метод работал.

1 Ответ

0 голосов
/ 29 октября 2019

Происходит много интересного.

Ваш излучатель явно блокируется, поэтому самое важное изменение, которое вы можете попробовать, это вызвать .observeOn(RxHelper.blockingScheduler(vertx)) при создании Flowable. Подробнее здесь о том, как использовать правильные планировщики.

Это, вероятно, не решит вашу проблему. Вы, вероятно, читаете данные в обычном для Java порядке ввода-вывода с readFromDataInputStream(), который может быть поточно-ориентированным, но почти наверняка не поддерживает одновременные программы чтения. Вы пытаетесь использовать неявные считыватели одновременно, пока readFromDataInputStream() в конечном итоге считывает данные из того же ввода-вывода. См. Java: одновременное чтение на InputStream

...