Ответ с несколькими ответами в Акке - PullRequest
0 голосов
/ 03 июля 2018

У меня есть связь с одним актером:

val future = ask(foo, Bar())(1 seconds).mapTo[Row]
val result = Await.result(future, 1 seconds)

Это прекрасно работает, пока я отправляю только один Row из foo. Как бы я ответил с несколькими Row с:

sender() ! Row()
sender() ! Row()

впоследствии в foo?

Ответы [ 3 ]

0 голосов
/ 09 июля 2018

Вы можете ответить только одним объектом. Как насчет ответа на список?

class Foo extends Actor {
  def receive = {
    case Bar(): sender() ! List.of(Row(), Row())
  }
}

val future = ask(foo, Bar())(1 seconds).mapTo[List[Row]]
val result = Await.result(future, 1 seconds)

val row0 = result(0);
val row1 = result(1);
0 голосов
/ 10 июля 2018

Я не уверен на 100%, чего вы надеетесь достичь, без дополнительного контекста. Я подозреваю, что ответ @ lasekio , скорее всего, именно то, что вы ищете.

Но если вы ищете решение для асинхронной потоковой передачи, то есть если вы хотите обрабатывать строки по мере их поступления, а не ждать, пока они все не станут доступными (возможно, потому, что в памяти больше строк, чем уместится - миллионы / миллиарды), тогда, ну, тогда вы должны использовать Akka Streams. Но так как кажется, что вы против этого, вот альтернатива, которая является чистой Akka + Futures:

Актер будет возвращать строку, а также будущее следующей строки. Поскольку вам нужен способ указать, что строк больше нет, я обертываю результат в Option в моем примере, где Some содержит Row и будущее следующего Row, а None означает, что больше нет строк. Вы также можете создать отдельный тип или использовать null, просто что-то, что ясно указывает на отсутствие дальнейших данных. На стороне актера, вы бы сделали:

case _: Bar =>
  if (/*... there are more rows...*/) sender() ! Some((Row(), (context.self ? Bar()))
  else sender() ! None

На стороне потребителя:

def consumeRows(rowsFut: Future[Option[(Row,Future[_])]]): Unit = {
  rowsFut.foreach {
    _.foreach {
      case (row: Row, nextRowsFut: Future[Option[(Row,Future[_])]]) =>
        // Do something with row
        println(row)
        consumeRows(nextRowsFut)
  }
}
val rowsFut = ask(foo, Bar())(1 seconds).mapTo[Option[(Row,Future[_])]]
consumeRows(rowsFut)
0 голосов
/ 03 июля 2018

Как уже упоминалось в комментариях, шаблон запроса будет работать, только если вы отправите все строки в виде одного сообщения. Использование akka-stream является отличным решением для построения процессора для этого:

val barActor = system.actorOf(Props(new BarActor()))

def runQuery(): Future[Seq[Row]] = {
  // complete when EndOfQuery message is received 
  val runnableGraph = Source.actorRef[DataProtocol](Int.MaxValue, OverflowStrategy.fail)
    .takeWhile(elem => {
      elem match {
        case _:Row => true
        case _ | EndOfQuery => false
      }
    })
    .toMat(Sink.seq[DataProtocol])(Keep.both)

  // keep both the ActorRef and the Future[DataProtocol]
  val (actor, future) = runnableGraph.run()

  // issue query to actor
  barActor ! Query(actor)

  // only Row messages were emitted:
  future.map(_.asInstanceOf[Seq[Row]])
}

val data = Await.result(runQuery(), Duration.Inf)

Как вы упомянули в своем вопросе, вам понадобится стоп-сообщение.

...