RxJava 2 - распараллелить события для выполнения с побочными эффектами - PullRequest
0 голосов
/ 06 января 2019

Я хотел бы распараллелить выполнение без агрегирования событий в конце. Я новичок в RxJava и пытаюсь оценить, соответствует ли это моим потребностям.

Я знаю, как сделать нормальное непараллельное выполнение. Для простоты начнем с:

taskReader.stream() // returns Flowable<Task>
    .subscribe(this::processTask) // sends results to another micro service

Это хорошо работает в упорядоченном, блокирующем виде. Однако я хотел бы распараллелить это и выполнить все задачи одновременно. Документация в основном говорит вам использовать flatMap, что имеет смысл, но я не могу понять, как запустить все это. Посмотрим:

taskReader.stream() // returns Flowable<Task> 
    .flatMap(
        Flowable.just(task)
            .subscribeOn(Schedulers.computation())
            .map(this::processTask)
    )

Это то, что мне нужно, но, конечно, это не запускает Flowable, так как на него ничего не подписано [1]. Все мои задачи не связаны, поэтому мне не нужно снова объединять их в поток верхнего уровня, и мне, конечно, нет дела до их порядка.

Как вы начинаете Flowable таким образом? Я не хочу подписываться на верхний уровень Flowable, так как мне больше там ничего не нужно делать.

В качестве альтернативы, документы говорят вам использовать параллельные потоки:

taskReader.stream() // returns Flowable<Task> 
    .parallel()
    .runOn(Schedulers.computation())
    .map(this::processTask)
    .sequential();

И снова, мне не нужно ничего упорядочивать в конце, так как меня не волнует порядок. Кроме того, не хочу подписываться на это, поскольку работа на карте - все, что мне нужно.

Что бы я действительно хотел, это:

taskReader.stream() // returns Flowable<Task> 
    .flatMap(
        Flowable.just(task)
            .subscribeOn(Schedulers.computation())
            .subscribe(this::processTask)
    )

Но это не то, как работает RxJava.

Помощь?

Некоторые предыстории: я получаю события из очереди (в частности, AWS SQS), где события не связаны друг с другом. Для каждого из них мне нужно выполнить некоторую интенсивную работу ввода-вывода, затем некоторую интенсивную работу с процессором и, наконец, отправить результаты в другую систему. Я хотел бы запускать все эти события параллельно, поэтому мне не нужно упорядочивать только хорошее планирование для вычислений и работы. Я думал, что RxJava поможет мне в этом, но, возможно, я пытаюсь использовать не тот инструмент для этой работы, поскольку я борюсь с первым шагом.

[1] Это то, что мы получаем из документов, если я не ищу это неправильно. Это действительно единственная документация, которая у нас есть - https://github.com/ReactiveX/RxJava/wiki? Я ожидаю, что у меня будет что-то вроде того, что есть в Reactor: https://projectreactor.io/docs/core/release/reference/

1 Ответ

0 голосов
/ 06 января 2019

Измените processTask () на что-то похожее на Текущий поток,

public Flowable<ProcessTask> processTask() {
    return Flowable
        .create(
            e -> {
                //Put the complete code inside this block which you have currently inside your processTask() method
                //Whatever processed output you get pass it as shown below.
                e.onNext(//Pass <ProcessTask> object);
            },
            BackpressureStrategy.BUFFER //Choose any BackpressureStrategy which suits your requirement
        )
        .subscribeOn(Schedulers.io());
}

Итак, в принципе, processTask () также возвращает поток Flowable.

Теперь вы можете использовать flatMap (),

taskReader
    .stream() // returns Flowable<Task>
    .flatMap(task -> processTask(task))
    .subscribe(
        result -> //Processed task, result from e.onNext(),
        error -> //Error
    );

Надеюсь, это поможет.

...