Как сделать поток источника Акка FIFO очереди? - PullRequest
0 голосов
/ 22 мая 2018

Я занимаюсь разработкой системы торговли акциями с использованием akka.Я предлагаю книги заявок в TradeQueue, как показано ниже:

val tradeQueue = Source.queue[TradeTask](1, OverflowStrategy.backpressure)
.map(task=>{
  println("TradeTask Start:"+task)
  task
})
.via(ProcessA)
.via(ProcessC)
.via(ProcessC)
.toMat(Sink.foreach(task => {
     log.info("TradeTask finish:"+task)
 }))(Keep.left).run()


 for (item <- 1 to 100) {
    val task = TradeTask(item)
    tradeQueue.offer(task)
 }   

Но последовательность неупорядочена.

примерно так:

Начало TradeTask: TradeTask (1)

TradeTask Начало : TradeTask (2)

Окончание TradeTask : TradeTask (1)

Окончание TradeTask : TradeTask (2)

НоЯ хочу, чтобы FIFO и элемент были поставлены в очередь перед предыдущим завершением ,, как это

Начало TradeTask: TradeTask (1)

Окончание TradeTask: TradeTask (1)

Начало TradeTask :TradeTask (2)

TradeTask окончание : TradeTask (2)

Как это сделать?Спасибо

1 Ответ

0 голосов
/ 22 мая 2018

Уже FIFO

Ваш вопрос уже доказывает, что очередь "(F) irst (I) n (F) irst (O) ut".Как показано в выходных данных, первый элемент для входа в поток, TradeTask(1), был первым элементом, который должен быть обработан Sink:

TradeTask Start:TradeTask(1)    // <-- FIRST IN

TradeTask Start:TradeTask(2)

TradeTask finish:TradeTask(1)   // <-- FIRST OUT

TradeTask finish:TradeTask(2)

Косвенный ответ

Вопрос, который вы задаете, полностью противоположен цели / использованию akka-stream.Вы спрашиваете, как создать поток akka, который выполняет всю обработку последовательно, а не асинхронно.

Все дело в akka: асинхронная обработка и связь :

Добро пожаловать в Akka, набор библиотек с открытым исходным кодом для разработки масштабируемых, отказоустойчивых систем, которыеохватывающие процессорные ядра и сети.

Если вы хотите, чтобы обработка выполнялась по одному, то зачем в первую очередь использовать akka?Синхронная обработка Iterable элементов легко выполняется без акки с использованием коллекций Scala и для-понимания:

val functionA : Task => Task = ???
val functionB : Task => Task = ???
val functionC : Task => Task = ???

val logTask : Task => Unit = 
  (task) => log.info("TradeTask finish:" + task)

for {
  item    <- 1 to 100
  task    <- TradeTask(item)
  aResult <- functionA(task)
  bResult <- functionB(aResult)
  cResult <- functionC(bResult)
} { 
  logTask(cResult)
}

Аналогично вы можете использовать составление функций и упрощенную итерацию:

val compositeFunction : Int => Unit = 
  TradeTask.apply andThen functionA andThen functionB andThen functionC andThen logTask

(1 to 100) foreach compositeFunction
...