RxJava: как обрабатывать события в нескольких потоках - PullRequest
0 голосов
/ 16 мая 2018

Например, у меня есть следующий код, который использует библиотеку RxJava:

public class MultithreadingExample {
public static void main(String[] args) throws InterruptedException {
    Observable.from(Lists.newArrayList(1, 2, 3, 4, 5))
            .observeOn(Schedulers.computation())
            .map(numberToString())
            .subscribe(printResult());
    Thread.sleep(10000);
}

private static Func1<Integer, String> numberToString() {
    return number -> {
        System.out.println("Operator thread: " + Thread.currentThread().getName());
        return String.valueOf(number);
    };
}

private static Action1<String> printResult() {
    return result -> {
        System.out.println("Subscriber thread: " + Thread.currentThread().getName());
        System.out.println("Result: " + result);
    };
}

}

И я хочу, чтобы события в Observer обрабатывались несколькими потоками, например, item'1' для Thread-1, элемент '2' для Thread-2 и т. Д.

Каков наилучший способ сделать это с RxJava?

1 Ответ

0 голосов
/ 17 мая 2018

Вы можете использовать оператор flatMap().

Observable.from(Lists.newArrayList(1, 2, 3, 4, 5))
        .flatMap( number -> Observable.defer( numberToString() )
                              .subscribeOn( Schedulers.computation() ) )
        .observeOn(Schedulers.computation())
        .map(numberToString())
        .subscribe(printResult());

Оператор flatMap() подпишется на новое наблюдаемое в (вероятно, новом) потоке, объединяя результаты обратно в поток, гдефинал observeOn() готов.

...