Уже FIFO
Ваш вопрос уже доказывает, что очередь "(F) irst (I) n (F) irst (O) ut".Как показано в выходных данных, первый элемент для входа в поток, TradeTask(1)
, был первым элементом, который должен быть обработан Sink
:
TradeTask Start:TradeTask(1) // <-- FIRST IN
TradeTask Start:TradeTask(2)
TradeTask finish:TradeTask(1) // <-- FIRST OUT
TradeTask finish:TradeTask(2)
Косвенный ответ
Вопрос, который вы задаете, полностью противоположен цели / использованию akka-stream.Вы спрашиваете, как создать поток akka, который выполняет всю обработку последовательно, а не асинхронно.
Все дело в akka: асинхронная обработка и связь :
Добро пожаловать в Akka, набор библиотек с открытым исходным кодом для разработки масштабируемых, отказоустойчивых систем, которыеохватывающие процессорные ядра и сети.
Если вы хотите, чтобы обработка выполнялась по одному, то зачем в первую очередь использовать akka?Синхронная обработка Iterable
элементов легко выполняется без акки с использованием коллекций Scala и для-понимания:
val functionA : Task => Task = ???
val functionB : Task => Task = ???
val functionC : Task => Task = ???
val logTask : Task => Unit =
(task) => log.info("TradeTask finish:" + task)
for {
item <- 1 to 100
task <- TradeTask(item)
aResult <- functionA(task)
bResult <- functionB(aResult)
cResult <- functionC(bResult)
} {
logTask(cResult)
}
Аналогично вы можете использовать составление функций и упрощенную итерацию:
val compositeFunction : Int => Unit =
TradeTask.apply andThen functionA andThen functionB andThen functionC andThen logTask
(1 to 100) foreach compositeFunction