Хороший шаблон Scala для потребления из очереди, пока больше нет сообщений? - PullRequest
0 голосов
/ 25 апреля 2018

У меня есть очередь из N сообщений, отправленных Актером, я хочу использовать их все. Актер вернет либо тип Message, либо тип NoMessages, если очередь пуста.

Я придумал это, но не чувствую идиоматизма, и я не уверен, сколько потоков я раскручиваю каждый раз, когда звоню consume()?

Как лучше это сделать?

def main(): Unit = {

  val queue = system.actorOf(...)

  def consume(): Unit = {
    ask(queue, Read) foreach {
      case Message(m) => {
        // handle message
        consume()
      }
      case NoMessages =>  {
        system.shutdown()
      }
    }
  }
  consume()
}

1 Ответ

0 голосов
/ 25 апреля 2018

Если Message и NoMessages расширяют общую черту (назовем ее Msg), вы можете использовать Akka Streams :

import akka.Done
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util.Timeout
import scala.concurrent._
import scala.concurrent.duration._

implicit val system = ActorSystem("QueueSys")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val queue = system.actorOf(...)

def handleMessage(msg: Message): Unit = ???

implicit val askTimeout = Timeout(5.seconds)

val stream: Future[Done] = Source.fromIterator(() => Iterator.continually(Read))
  .ask[Msg](parallelism = 3)(queue) // adjust the parallelism as needed
  .takeWhile(_.isInstanceOf[Message])
  .runForeach(handleMessage)

stream.onComplete(_ => system.terminate())

Вышеуказанный поток будет постоянноотправлять Read сообщений субъекту queue и обрабатывать Message ответов, пока субъект не ответит NoMessage.

...