Как мне поставить работу в очередь в нескольких издателях в Reactor 3? - PullRequest
0 голосов
/ 16 октября 2018

Я создаю библиотеку для создания рабочих процессов обработки данных с использованием Reactor 3. Каждая задача будет иметь входной поток и выходной поток.Входной поток предоставляется пользователем.Выходной поток создается библиотекой.Задачи могут быть объединены в цепочки, чтобы сформировать DAG.Примерно так: (это в Kotlin)

val base64 = task<String, String>("base64") {
    input { Flux.just("a", "b", "c", "d", "e") }
    outputFn { ... get the output values ... }
    scriptFn { ... do some stuff ... }
}

val step2 = task<List<String>, String>("step2") {
    input { base64.output.buffer(3) }
    outputFn { ... }
    scriptFn { ... }
}

У меня есть требование ограничить параллелизм для всего рабочего процесса.Только настроенное количество входов может быть обработано одновременно.В приведенном выше примере для предела 3 это будет означать, что задача base64 будет сначала запускаться со входами «a», «b» и «c», затем ждать завершения каждого из них, прежде чем обрабатывать «d», «e» и «step2 "tasks.

Как я могу применить такие ограничения при создании выходных потоков из входных потоков?Может ли TopicProcessor как-то применяться?Может быть, какой-то пользовательский планировщик или процессор?Как будет работать противодавление?Нужно ли беспокоиться о создании буфера?

1 Ответ

0 голосов
/ 23 октября 2018

Противодавление распространяется от конечного амортизатора вверх по всей цепи.Но операторы в цепочке могут запрашивать данные заранее (предварительную выборку) или даже «переписывать» запрос.Например, в случае buffer(3), если этот оператор получает request(1), он будет выполнять request(3) в восходящем направлении ("1 буфер == макс. 3 элемента, поэтому я могу запросить свой источник достаточно, чтобы заполнить 1 буфер, который мне был запрошен").").

Если входные данные всегда предоставляются пользователем, их будет трудно абстрагировать ...

Нет простого способа оценить ограничение источников для нескольких конвейеров или даже несколькихподписки на данный конвейер (a Flux).

Использование общего Scheduler в нескольких publishOn не будет работать, поскольку publishOn выбирает поток Worker и придерживается его.

Однако, если ваш вопрос более конкретно касается ограничения задачи base64, возможно, эффект можно получить из параметра одновременности flatMap?

input.flatMap(someString -> asyncProcess(someString), 3, 1);

Это позволит максимально 3вхождения asyncProcess запускаются, и каждый раз, когда один завершается, он начинает новое со следующего значения из input.

...