У меня есть статья, которая потребляет входящие сообщения.Каждое сообщение будет вершинным JsonObject
, которое содержит вершинное значение JsonArray
.Я хочу выполнить логику для каждого элемента в этом массиве.Сама логика содержится в отдельной статье.Эта вторая вертикаль использует rxVertx
.Он определяет несколько потребителей, каждый из которых делегирует отдельные методы, каждый из которых возвращает Observable
.
. Мой вопрос: как:
- пройти каждый элемент в
JsonArray
- передать каждый элемент потребителю, который работает с
Observables
.
В первой статье попытались сделать следующее:
EventBus eb = rxVertx.eventBus();
JsonArray array= incomingMessage.getJsonArray(KEY);
List<Object> list = array.getList();
Observable<Object> observable = Observable.fromArray(list);
observable.flatMapSingle(s -> {
eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
}).subscribe();
Вызов flatMapSingle
не компилируется, потому что:
The method flatMapSingle(Function<? super Object,? extends SingleSource<? extends R>>) in the type Observable<Object> is not applicable for the arguments ((<no type> s) -> {})
Как правильно это сделать?Большое спасибо