Обход Vertx JsonArray с использованием RxJava - PullRequest
0 голосов
/ 05 июня 2019

У меня есть статья, которая потребляет входящие сообщения.Каждое сообщение будет вершинным JsonObject, которое содержит вершинное значение JsonArray.Я хочу выполнить логику для каждого элемента в этом массиве.Сама логика содержится в отдельной статье.Эта вторая вертикаль использует rxVertx.Он определяет несколько потребителей, каждый из которых делегирует отдельные методы, каждый из которых возвращает Observable.

. Мой вопрос: как:

  1. пройти каждый элемент в JsonArray
  2. передать каждый элемент потребителю, который работает с Observables.

В первой статье попытались сделать следующее:

EventBus eb = rxVertx.eventBus();
JsonArray array= incomingMessage.getJsonArray(KEY);
List<Object> list  = array.getList();
Observable<Object> observable = Observable.fromArray(list);

observable.flatMapSingle(s -> {
      eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
   }).subscribe();

Вызов flatMapSingle не компилируется, потому что:

The method flatMapSingle(Function<? super Object,? extends SingleSource<? extends R>>) in the type Observable<Object> is not applicable for the arguments ((<no type> s) -> {})

Как правильно это сделать?Большое спасибо

1 Ответ

1 голос
/ 05 июня 2019

Если вы определяете параметр функции flatMapSingle с помощью блока кода, вы должны использовать ключевое слово return:

observable.flatMapSingle(s -> {
  return eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
}).subscribe(reply -> {
  // Handle each reply
});

Обратите внимание, что flatMapSingle не гарантирует, что ответы будут в том же порядке, что и входящие сообщения. Если вам нужна эта гарантия, используйте concatMapSingle.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...