Я хотел бы распараллелить выполнение без агрегирования событий в конце. Я новичок в RxJava и пытаюсь оценить, соответствует ли это моим потребностям.
Я знаю, как сделать нормальное непараллельное выполнение. Для простоты начнем с:
taskReader.stream() // returns Flowable<Task>
.subscribe(this::processTask) // sends results to another micro service
Это хорошо работает в упорядоченном, блокирующем виде. Однако я хотел бы распараллелить это и выполнить все задачи одновременно. Документация в основном говорит вам использовать flatMap, что имеет смысл, но я не могу понять, как запустить все это. Посмотрим:
taskReader.stream() // returns Flowable<Task>
.flatMap(
Flowable.just(task)
.subscribeOn(Schedulers.computation())
.map(this::processTask)
)
Это то, что мне нужно, но, конечно, это не запускает Flowable, так как на него ничего не подписано [1]. Все мои задачи не связаны, поэтому мне не нужно снова объединять их в поток верхнего уровня, и мне, конечно, нет дела до их порядка.
Как вы начинаете Flowable таким образом? Я не хочу подписываться на верхний уровень Flowable, так как мне больше там ничего не нужно делать.
В качестве альтернативы, документы говорят вам использовать параллельные потоки:
taskReader.stream() // returns Flowable<Task>
.parallel()
.runOn(Schedulers.computation())
.map(this::processTask)
.sequential();
И снова, мне не нужно ничего упорядочивать в конце, так как меня не волнует порядок. Кроме того, не хочу подписываться на это, поскольку работа на карте - все, что мне нужно.
Что бы я действительно хотел, это:
taskReader.stream() // returns Flowable<Task>
.flatMap(
Flowable.just(task)
.subscribeOn(Schedulers.computation())
.subscribe(this::processTask)
)
Но это не то, как работает RxJava.
Помощь?
Некоторые предыстории: я получаю события из очереди (в частности, AWS SQS), где события не связаны друг с другом. Для каждого из них мне нужно выполнить некоторую интенсивную работу ввода-вывода, затем некоторую интенсивную работу с процессором и, наконец, отправить результаты в другую систему. Я хотел бы запускать все эти события параллельно, поэтому мне не нужно упорядочивать только хорошее планирование для вычислений и работы. Я думал, что RxJava поможет мне в этом, но, возможно, я пытаюсь использовать не тот инструмент для этой работы, поскольку я борюсь с первым шагом.
[1] Это то, что мы получаем из документов, если я не ищу это неправильно. Это действительно единственная документация, которая у нас есть - https://github.com/ReactiveX/RxJava/wiki? Я ожидаю, что у меня будет что-то вроде того, что есть в Reactor: https://projectreactor.io/docs/core/release/reference/