Использование RxJava в Scala с flatmap - PullRequest
0 голосов
/ 17 апреля 2019

Я пытаюсь использовать RX Java в приложении Scala для параллельного выполнения нескольких запросов

Код в Java, который успешно выполняется:

List<rx.Observable<Response<MyObject>>> observables = requests.stream().map(
    request -> client.getResponse(request).collect(Collectors.toList());

Observable<MyObject> mergedObservable = Observable.merge(observables)
    .flatMap(page -> {
        List<MyObject> objects = page.getResults();
        Observable<MyObject> objectsObservable =  Observable.from(objects);
        return objectsObservable;
}).toBlocking().getIterator()

Я хочу написать эквивалентэтого фрагмента в Scala и что у меня есть

val observables: Array[Observable[Response[MyObject]]] = requests.map(request => client.getResponse(request));
Observable.merge(observables.toList)
       .flatMap[Observable[MyObject]]((page: Response[MyObject]) => {
       val resutls: java.util.List[MyObject] = page.getResults
       val resultObservable: Observable[MyObject] = Observable.from(resutls)
       resultObservable
    }).toBlocking
      .getIterator

Код не выполняется, за исключением

 overloaded method value flatMap with alternatives:
  (x$1: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$2: rx.functions.Func1[_ >: Throwable, _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$3: rx.functions.Func0[_ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$4: Int)rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
  (x$1: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$2: rx.functions.Func1[_ >: Throwable, _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$3: rx.functions.Func0[_ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]])rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
  (x$1: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$2: Int)rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
  (x$1: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]])rx.Observable[rx.Observable[com.mypackage.MyObject]]
 cannot be applied to (com.mypackage.Response[com.mypackage.MyObject] => rx.Observable[com.mypackage.MyObject])

Есть какие-нибудь мысли о том, как изменить код Scala, чтобы избежать этого?

Ответы [ 2 ]

2 голосов
/ 17 апреля 2019

Что говорит ошибка, если вы внимательно прочитаете, так это то, что она ожидает увидеть что-то вроде Func1[Response[MyObject], Observable[Observable[MyObject]]] из-за параметра типа в flatMap.Удаление или изменение на flatMap[MyObject] может быть достаточно для решения проблемы (при условии, что вы используете Scala 2.12 или Scala 2.11 с -Xexperimental; в противном случае вы не можете напрямую использовать лямбду для Func и вам потребуетсявспомогательная функция).

Если это не так, я бы просто извлек лямбду и присвоил ей нужный тип (все еще предполагая Scala 2.12):

val f: Func1[Response[MyObject], Observable[MyObject]] = 
  page => Observable.from(page.getResults)

Observable.merge(observables.toList).flatMap(f).toBlocking.getIterator

Вы можете сделать это встроенныма также:

.flatMap({ page =>
   val resutls: java.util.List[MyObject] = page.getResults
   val resultObservable: Observable[MyObject] = Observable.from(resutls)
   resultObservable
}: Func1[Response[MyObject], Observable[MyObject]])...
0 голосов
/ 17 апреля 2019

Полагаю, я могу помочь, но просто возможно.Однажды у меня была похожая проблема, и причина была в том, что scala сначала использовала scala.function вместо java.fucntion, поэтому лямбда-выражения несовместимы.Я не собираюсь обещать, что это сработает, но использую внутренние классы вместо лямбда-классов.

...