Что означает эквивалентный pipeTo для Akka? - PullRequest
0 голосов
/ 03 мая 2018

В настоящее время я пытаюсь переписать существующего нетипизированного актера в типизированный. Поскольку субъект взаимодействует с базой данных MySQL с использованием ScalikeJDBC, и поскольку я хотел бы сделать это асинхронно, я имею дело с Futures, выходящими из отдельного (неактивного) класса репозитория.

С нетипизированным Akka, в методе получения актера, я мог бы сделать это:

import akka.pattern.pipe
val horseList : Future[Seq[Horse]] = horseRepository.listHorses(...)
horseList pipeTo sender()

И актер-отправитель получит список лошадей. Я не могу понять, как сделать это внутри поведения, как:

val behaviour : Behavior[ListHorses] = Behaviors.receive { 
    (ctx,msg) => msg match {
        case ListHorses(replyTo) => 
            val horseListF : Future[Seq[Horse]] = horseRepository.listHorses(...)
            // -> how do I make horseListF's content end up at replyTo? <-
            Behaviors.same
    }
}

Паттерн канала не работает (так как он ожидает нетипизированный ActorRef), и до сих пор я не нашел ничего другого в зависимости akka-actor-typed (2.5.12), которую я использую для этой работы.

Как мне это сделать?

Ответы [ 3 ]

0 голосов
/ 16 апреля 2019

Вы можете просто отправить сообщение на replyTo, когда будущее завершится успешно:

case ListHorses(replyTo) => 
    horseRepository.listHorses(...) foreach { horses => replyTo ! horses }
    Behaviors.same

Или, если вы также хотите обрабатывать ошибки:

case ListHorses(replyTo) =>
    horseRepository.listHorses(...) onComplete { 
        case Success(horses) => replyTo ! horses
        case Failure(e) => // error handling 
    }
    Behaviors.same

Чтобы это работало, вам нужно ExecutionContext. Обычно имеет смысл использовать тот же актер, что и для актера, поэтому сначала вам нужно будет сделать его доступным для onComplete или foreach:

implicit val ec = ctx.executionContext
0 голосов
/ 15 мая 2019

В Акке 2.5.22 (может раньше) есть context.pipeToSelf:

  def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit

Вы все еще должны предоставить соответствие шаблону для Success и Failure, которое в моем коде я уменьшил с помощью этого сахара:

def mapPipe[A, T](success: A => T, failure: Throwable => T): Try[A] => T = {
  case Success(value) => success(value)
  case Failure(e) => failure(e)
}

В результате вызова, как это:

case class Horses(horses: Seq[Horse]) extends Command
case class HorseFailure(e: Throwable) extends Command

...

context.pipeToSelf(horseList) {
  mapPipe(Horses,HorseFailure)
}
0 голосов
/ 03 мая 2018

Почему вы не используете будущий обратный вызов для отправки своего сообщения обратно? Посмотрите на этот пример, возможно, вы можете использовать симлиарное приближение:

import akka.NotUsed
import akka.typed.{ActorRef, ActorSystem, Behavior}
import akka.typed.scaladsl.Actor

import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global

sealed trait Response
case class Message(msg: String) extends Response

case class Greet(whom: String, replayTo: ActorRef[Response])

object Methods {
  def GetRecipient : Future[String] = Future { "Me" }
}

object Greeter {
  import Methods._
  import akka.typed.scaladsl.Actor

  val behavior =
    Actor.immutable[Greet] { (ctx, greet) =>
      println(greet)
      GetRecipient onComplete {
        case Success(str) => {
          // Use the future call back instad the pipeTo
          greet.replayTo ! Message("Hi!")
        }
        case Failure(err) => greet.replayTo ! Message("Error")
      }
      Actor.same
    }
}

object Man extends App {

  import Greeter._
  import scala.concurrent.duration._

  val main: Behavior[Response] = {
    Actor.deferred[Response] { ctx =>
      val enricherRef = ctx.spawn(behavior, "greet")
      enricherRef ! Greet("hey", ctx.self)

      Actor.immutable[Response] {
        case (ctx, m: Response) => {
          println(m)
          Actor.same
        }
      }
    }
  }

  val system = ActorSystem( "GreetDemo", main)

  Thread.sleep(5000)
}

В этом примере отправляется только сообщение новому порожденному субъекту, но в вашем случае я использую нового субъекта, например, для каждого запроса.

...