Асинхронно собирать и составлять ответы из списка Akka Actors - PullRequest
0 голосов
/ 06 августа 2020

У меня есть Akka Actor с именем Gate, который отвечает на сообщение Status ответным сообщением Open или Closed:

"A stateless gate" must {
    "be open" in {
      val parent = TestProbe()
      val gate = parent.childActorOf(
        TestStatelessGate.props(7)
      )
      gate ! 7
      gate ! Gate.Status
      parent.expectMsg(Gate.Open)
    }

Я хотел бы создать логический элемент И, который запрашивает список ворот, возвращая Open, если все они открыты:

"A logical AND gate" must {
    "be open when all children are open" in {
      val parent = TestProbe()
      val parent2 = TestProbe()
      val gate_1 = parent.childActorOf(
        TestStatelessGate.props(7)
      )
      val gate_2 = parent.childActorOf(
        TestStatelessGate.props(5)
      )
      val gate_list = List(gate_1, gate_2)
      val and_gate = parent2.childActorOf(
        LogicalAndGate.props(gate_list)
      )
      gate_1 ! 7
      gate_2 ! 5
      and_gate ! Gate.Status
      parent2.expectMsg(Gate.Open)

В документации Scala есть приятный момент об использовании выражения for и pipe здесь . Соответствующая часть этой документации:

final case class Result(x: Int, s: String, d: Double)
case object Request

implicit val timeout = Timeout(5 seconds) // needed for `?` below

val f: Future[Result] =
  for {
    x <- ask(actorA, Request).mapTo[Int] // call pattern directly
    s <- actorB.ask(Request).mapTo[String] // call by implicit conversion
    d <- (actorC ? Request).mapTo[Double] // call by symbolic name
  } yield Result(x, s, d)

f.pipeTo(actorD

Я зависаю, пытаясь сделать что-то вроде этого со списком ActorRefs (gate_list в приведенном ниже коде):

override def receive: Receive = {
    case Status => {
      val futures: Seq[Future[Any]] =
        for (g <- gate_list)
          yield ask(g, Status)
      val all_open: Future[Boolean] = Future {
        !futures.contains(Closed)
        }
      pipe(all_open) to parent
    }
  }

Конечно, это не сработает, потому что futures.contains(Closed) сравнивает два разных типа вещей, Future[Any] и мой объект case.

1 Ответ

1 голос
/ 06 августа 2020

Я предполагаю, что Open и Closed - это case object значения, которые наследуются от некоторой общей черты OpenClosed.

Во-первых, вам нужно использовать mapTo для преобразования результата ask на OpenClosed. Я бы также использовал map вместо for:

val futures: Seq[Future[OpenClosed]] =
  gate_list.map(g => ask(g, Status).mapTo[OpenClosed])

Тогда вам нужно Future.sequence, чтобы дождаться завершения всех этих действий:

Future.sequence(futures).onComplete {
  case Success(res) =>
    parent ! res.forall(_ == Open)
  case Failure(_) =>
    parent ! Closed
}
...