Akka Actor - pipeTo - Нецелесообразно ли использовать значения из полученного сообщения в ответе по каналу? - PullRequest
0 голосов
/ 03 мая 2019

Я обрабатываю Future в Actor с шаблоном pipeTo, который, кажется, работает нормально.

В приведенном ниже примере UserProxyActor просит UserActivityActor с сообщением Get(userId).

Я хочу включить параметры сообщения Get в ответ, чтобы получающий субъект имел все необходимое для обработки сообщения. Например, вставьте действия в БД с соответствующим идентификатором пользователя.

  1. Доступен ли userId в вызове map или он "закрыт"?
  2. Это сработает, потому что шаблон запроса будет блокироваться?
  3. Есть ли какой-нибудь более приятный способ сделать это, с чем я не сталкивался?
class UserActivityActor(repository: UserActivityRepository) extends Actor {
  import akka.pattern.pipe
  import UserActivityActor._
  implicit val ec: ExecutionContext = context.dispatcher

  def receive = {
    case Get(userId) =>
      // user's historical activities are retrieved
      // via the separate repository
      repository.queryHistoricalActivities(userId)
        .map(a => UserActivityReceived(userId, a))      // wrap the completed future value in a message
        .recover{case ex => RepoFailure(ex.getMessage)} // wrap failure in a local message type
        .pipeTo(sender())


class UserProxyActor(userActivities: ActorRef) extends Actor {
  import UserProxyActor._
  import akka.pattern.{ ask, pipe }

  implicit val ec: ExecutionContext = context.dispatcher
  implicit val timeout = Timeout(5 seconds)

  def receive = {
    case GetUserActivities(user) =>
      (userActivities ? UserActivityActor.Get(user))
        .pipeTo(sender())
  }
}

Ответы [ 2 ]

1 голос
/ 03 мая 2019

Доступен ли идентификатор пользователя в вызове карты или он «закрыт»?

Get должен быть неизменным, если да userId будет доступно.

Будет ли это работать, потому что шаблон запроса будет блокироваться?

Актер получает сообщение Get, создает Future и затем обрабатывает другое сообщение.Никаких блокировок вообще.Спросите будущее не будет завершено, пока будущее не будет завершено или не истечет время ожидания.

Есть ли какой-нибудь более приятный способ сделать это, с которым я не сталкивался?

Выглядит хорошо, если repository.queryHistoricalActivities(userId) не блокирует вызов.

0 голосов
/ 03 мая 2019

Не думаю, что с тобой что-то не так.

Единственное, что я хочу упомянуть, это то, что я лично предпочитаю не использовать ask при общении между актерами (и имейте в виду, что это в основном личное предпочтение), поэтому я бы сделал что-то вроде этого:

class UserActivityActor(repository: UserActivityRepository) extends Actor {
  import akka.pattern.pipe
  import UserActivityActor._
  implicit val ec: ExecutionContext = context.dispatcher

  def receive = {
    case Get(userId) =>
      // user's historical activities are retrieved
      // via the separate repository
      repository.queryHistoricalActivities(userId)
        .map(a => UserActivityReceived(userId, a))      // wrap the completed future value in a message
        .recover{case ex => RepoFailure(ex.getMessage)} // wrap failure in a local message type
        .pipeTo(sender())


class UserProxyActor(userActivities: ActorRef) extends Actor {
  import UserProxyActor._

  def receive = {
    case GetUserActivities(user, sender()) =>
      userActivities.forward(UserActivityActor.Get(user))
  }
}

Это, однако, устраняет поведение тайм-аута (в исходной реализации запрашивающий субъект будет ожидать не более 5 секунд или получит сбой, в моем случае он может ждать бесконечно).

...