Последовательность сообщений akka router - PullRequest
0 голосов
/ 20 октября 2018

Используя маршрутизаторы akka, мне нужно выполнить две разные операции.Но когда я передаю сообщения, они перекрываются.Вот мой код.

class Master extends Actor {

    import context._

    val numRoutees = 3
    val router: ActorRef = actorOf (RoundRobinPool (numRoutees).props(Props[Worker]), "router")

    // broadcasts GetString() and receives a random string from each routee
    def stringMessages(replies: Set[String] = Set()): Receive = {
        case GetString() =>
            router ! Broadcast(GetString())     // g
        case reply: String =>
            val updatedReplies = replies + reply
            if (updatedReplies.size == numRoutees) {
                println("result = " + updatedReplies.mkString("[", ",", "]"))
            }
            become(stringMessages(updatedReplies))

        case GetInteger() =>
            become(intMessages())
            //    self ! createArray()  // h      // <- uncommenting this results in an infinte loop

        case _ => println("stringMessages: no matches")
    }

    // broadcasts GetInteger and receives a random integer from each routee
    def intMessages(ints: Set[Int] = Set()): Receive = {
        case GetInteger() =>
            router ! Broadcast(GetInteger())    // e
        case n: Int =>
            val updatedInts = ints + n
            if (updatedInts.size == numRoutees) {
                println("result = " + updatedInts.mkString("[", ",", "]"))
            }
            become(intMessages(updatedInts))

        case GetString() =>
            become(stringMessages())
            self ! GetString()                  // f

        case _ => println("intMessages: no matches")
    }

    override def receive: Receive =
    {
        case GetString() =>
            become(stringMessages())
            self ! GetString()      // c
        case GetInteger() =>
            become(intMessages())
            self ! GetInteger()     // d
        case _ => println("root doesn't match")
    }
}


object MasterTest extends App {
    val system = ActorSystem ("ActorSystem")
    val actor = system.actorOf(Props[Master], "root")

    actor ! GetInteger()        // a
    actor ! GetString()         // b
}

С некоторыми отладочными утверждениями я понимаю, что порядок выполнения может быть в порядке a -> b -> f -> g.(Обратите внимание на идентификаторы утверждений, прокомментированные в коде).Код не делает то, что я ожидаю.Выходные данные

result = [a,b,c]

Как заставить их выполнить в порядке a -> d-> e -> b -> f -> g.Если я добавлю Thread.sleep как

actor ! GetInteger()        // a
Thread.sleep(3000)
actor ! GetString()         // b

, я получу ожидаемый результат, то есть

result = [0,4,6]    // random integers
result = [a,b,c]    // random strings

Как сделать так, чтобы сообщения актера выстраивались в очередь таким образом, чтобы новоевыполняется только после того, как предыдущий полностью выполнен.Как лучше реализовать то, что я делаю с become()?Если я хочу добавить больше состояний в код (например, GetInteger и GetString здесь), код становится слишком избыточным, чтобы изменять состояния, используя become(newState), от одного к другому.

Также, если я раскомментируюh код превращается в бесконечный цикл с a -> b -> f -> d -> h -> f -> h -> f -> ....Поэтому я понимаю, что это неправильная реализация.

1 Ответ

0 голосов
/ 20 октября 2018

Одна идея состоит в том, чтобы кодировать ответы String и Int в одном поведении Receive.Например:

case object GetInteger
case object GetString

// ...

def handleMessages(intReplies: Set[Int] = Set(), strReplies: Set[String] = Set()): Receive = {
  case GetInteger =>
    router ! Broadcast(GetInteger)
  case GetString =>
    router ! Broadcast(GetString)
  case i: Int =>
    val updatedInts = intReplies + i
    if (updatedInts.size == numRoutees) {
      println("result = " + updatedInts.mkString("[", ",", "]"))
    }
    become(handleMessages(updatedInts, strReplies))
  case str: String =>
    val updatedStrings = strReplies + str
    if (updatedStrings.size == numRoutees) {
      println("result = " + updatedStrings.mkString("[", ",", "]"))
    }
    become(handleMessages(intReplies, updatedStrings))
  case x =>
    println("Not an Int or String: " + x)
}

def receive = handleMessages

Обратите внимание, что я изменил GetInteger и GetString на объекты case вместо классов case, потому что у них нет параметров.Это позволяет вам убрать скобки в конце (т. Е. Вы можете использовать GetInteger вместо GetInteger()).

Кроме того, если вы беспокоитесь о порядке, рассмотрите возможность использования упорядоченной коллекции, такой какscala.collection.immutable.Seq вместо Set (который неупорядочен).

...