RxJava Объединение нескольких наблюдателей после фильтра - PullRequest
0 голосов
/ 09 ноября 2018

мой текущий код

  private final List<Disposable> subscriptions = new ArrayList<>();

  for (Instrument instrument : instruments) {
    // Waiting for OrderBook to generate Reliable results.
    GenericBook Book =
        service
            .getBook(instrument.getData())
            .filter(gob -> onBookUpdate(gob))
            .blockingFirst();

    subscriptions.add(
        service
            .getBook(instrument.getData())
            .subscribe(
                gob -> {
                  try {
                    onBookUpdate(gob);
                  } catch (Exception e) {
                    logger.error("Error on subscription:", e);
                  }
                },
                e -> logger.error("Error on subscription:", e)));
  }

Итак, для каждого инструмента он сначала блокирует ожидание, пока вывод onBookUpdate(gob) не станет истинным. onBookUpdate(gob) возвращает логическое значение. Как только мы получим onBookUpdate в качестве истинного значения, я переведу этого подписчика в переменную подписок.

Это замедляется, так как мне приходится ждать каждый инструмент и затем переходить к следующему инструменту.

Моя цель - запустить все это параллельно, затем дождаться завершения всех и передать их в переменную подписок.

Я пробовал почтовый индекс, но не работал

  List<Observable<GenericOrderBook>> obsList = null;
  for (Instrument instrument : instruments) {
    // This throws nullException.
   obsList.add(service
            .getBook(instrument.getData())
            .filter(gob -> onBookUpdate(gob))
            .take(1));
    }
  }
// Some how wait over here until all get first onBookUpdate as true.
String o = Observable.zip(obsList, (i) -> i[0]).blockingLast();

Ответы [ 2 ]

0 голосов
/ 12 ноября 2018

Я предполагаю, что instruments будет списком. Если да, то вы можете сделать что-то вроде этого,

Observable
    .fromIterable(instruments)
    // Returns item from instrument list one by one and passes it to getBook()
    .flatmap(
        instrument -> getBook(instrument.getData())
    )
    .filter(
        gob -> onBookUpdate(gob)
    )
    // onComplete will be called if no items from filter 
    .switchIfEmpty(Observable.empty())
    .subscribe(
        onBookUpdateResponse -> // Do what you want,
        error -> new Throwable(error)
    );

Надеюсь, это поможет.

0 голосов
/ 09 ноября 2018

При использовании наблюдаемых и т. Д. Следует принимать их от всего сердца. Одной из предпосылок для охвата является отделение конфигурации и конструкции вашего трубопровода от его выполнения.

Другими словами, настройте конвейер заранее, а затем, когда данные доступны, отправьте данные через него.

Кроме того, охват наблюдаемых означает избегание циклов for.

Я не на 100% использую ваш вариант использования, но я бы предложил создать конвейер, который будет принимать инструмент в качестве входных данных и возвращать подписку ...

Так что-то вроде

service.getBook(instrument.getData())
 .flatMap(gob -> {
   onBookUpdate(gob);
   return gob;
});

Это вернет Observable, на который вы можете подписаться, и добавит результат в подписку.

Затем создайте наблюдаемое семя, которое закачивает в него объекты инструмента.

Не уверен в некоторых деталях вашего API, поэтому возвращайтесь ко мне, если это не ясно или я сделал неверное предположение.

...