Я пытаюсь использовать RX Java в приложении Scala для параллельного выполнения нескольких запросов
Код в Java, который успешно выполняется:
List<rx.Observable<Response<MyObject>>> observables = requests.stream().map(
request -> client.getResponse(request).collect(Collectors.toList());
Observable<MyObject> mergedObservable = Observable.merge(observables)
.flatMap(page -> {
List<MyObject> objects = page.getResults();
Observable<MyObject> objectsObservable = Observable.from(objects);
return objectsObservable;
}).toBlocking().getIterator()
Я хочу написать эквивалентэтого фрагмента в Scala и что у меня есть
val observables: Array[Observable[Response[MyObject]]] = requests.map(request => client.getResponse(request));
Observable.merge(observables.toList)
.flatMap[Observable[MyObject]]((page: Response[MyObject]) => {
val resutls: java.util.List[MyObject] = page.getResults
val resultObservable: Observable[MyObject] = Observable.from(resutls)
resultObservable
}).toBlocking
.getIterator
Код не выполняется, за исключением
overloaded method value flatMap with alternatives:
(x$1: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$2: rx.functions.Func1[_ >: Throwable, _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$3: rx.functions.Func0[_ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$4: Int)rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
(x$1: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$2: rx.functions.Func1[_ >: Throwable, _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$3: rx.functions.Func0[_ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]])rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
(x$1: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$2: Int)rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
(x$1: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]])rx.Observable[rx.Observable[com.mypackage.MyObject]]
cannot be applied to (com.mypackage.Response[com.mypackage.MyObject] => rx.Observable[com.mypackage.MyObject])
Есть какие-нибудь мысли о том, как изменить код Scala, чтобы избежать этого?