Отправка на канал мертвого письма Акки от многих других актеров в случае отказа - PullRequest
0 голосов
/ 13 мая 2018

Новый для Scala + Akka здесь.У меня есть небольшая сеть актеров, которые посылают разные сообщения друг другу.Многим из них нужен доступ к моему DeadLetterChannel субъекту на случай, если они получат сообщение о том, что не знают, как с ним справиться:

class DeadLetterChannel(val queue : mutable.Queue[Any]) extends Actor with Logging {
  override def receive: Receive = {
    case any : Any =>
      log.info(s"${this.getClass} just received a message of type ${} from ${sender().getClass}.")
      queue.enqueue(any)
    case _ =>
      log.warn(s"${this.getClass} just received a non-Any message.")
  }
}

Затем изнутри многие другие субъекты:

class DeviceManager(val dlc : DeadLetterChannel) extends ActorRef {
  override def receive: Receive = {
    case Heartbeat =>
      // Handle Heartbeat here...
    case Connect:
      // Handle Connect here...
    case Disconnect:
      // Handle Disconnect here...
    case _ =>
      dlc ! ???
  }
}

У меня есть две проблемы:

  • Я получаю сообщение об ошибке компилятора при отправке сообщения (в частности, перегрузка !: " Не удается разрешить символ! ");и
  • Понятия не имею, что отправить на dlc в случае _, есть идеи?Очевидно, я просто пытаюсь отправить сообщение, полученное DeviceManager, которое не является типом Heartbeat, Connect или Disconnect
.

Ответы [ 2 ]

0 голосов
/ 13 мая 2018

Классы актеров должны расширяться Actor вместо ActorRef.Также используйте ActorRef для ссылки на актера.Следовательно, DeviceManager следует определить следующим образом:

class DeviceManager(dlc: ActorRef) extends Actor {
  // ...
}

Идея заключается в том, что вы будете использовать ActorRef актера DeadLetterChannel при создании актеров устройства.Например:

val dlc: ActorRef = context.actorOf(/* .. */)
val device1: ActorRef = context.actorOf(Props(classOf[DeviceManager], dlc)
val device2: ActorRef = context.actorOf(Props(classOf[DeviceManager], dlc)

Затем вы можете сделать так, как предложил Тим в своем ответе, захватить случай по умолчанию в блоке receive и отправить это сообщение на dlc:

class DeviceManager(dlc: ActorRef) extends Actor {
  def receive = {
    /* other case clauses */
    case msg =>
      dlc ! msg
  }
}

Кроме того, вместо передачи изменяемой очереди в DeadLetterChannel в качестве аргумента конструктора, инкапсулируйте это состояние внутри субъекта в виде неизменяемой очереди и выставляйте его только через передачу сообщений:

class DeadLetterChannel extends Actor with akka.actor.ActorLogging {
  var queue = collection.immutable.Queue[Any](0)

  def receive = {
    case GetMessages => // GetMessages is a custom case object
      sender() ! queue
    case msg =>
      val s = sender()
      log.info(s"DLC just received the following message from [$s]: $msg")
      queue = queue.enqueue(msg)
  }
}

Другой подходзаключается в том, чтобы не определять регистр по умолчанию (т. е. пропустить предложение case msg =>) и использовать механизмы Akka для работы с необработанными сообщениями.Ниже приведены несколько параметров:

  1. Включить встроенную запись для необработанных сообщений , которая регистрирует эти сообщения на уровне отладки:

    akka {
      actor {
        debug {
          # enable DEBUG logging of unhandled messages
          unhandled = on
        }
      }
    }
    

    Этот подход, пожалуй, самый простой, если вы просто хотите регистрировать необработанные сообщения.

  2. Как указано в документации :

    Если текущее поведение актера не соответствует полученному сообщению ... по умолчанию публикует akka.actor.UnhandledMessage(message, sender, recipient) в потоке событий системы актера.

    В свете этого вы можете создать актера, который подписывается напоток событий и обрабатывает akka.actor.UnhandledMessage сообщений.Такой актер может выглядеть следующим образом:

    import akka.actor.{ Actor, UnhandledMessage, Props }
    
    class UnhandledMessageListener extends Actor {
      def receive = {
        case UnhandledMessage(msg, sender, recipient) =>
          // do something with the message, such as log it and/or something else
      }
    }
    
    val listener = system.actorOf(Props[UnhandledMessageListener])
    system.eventStream.subscribe(listener, classOf[UnhandledMessage])
    

    Более подробную информацию о потоке событий и создании подписчика на этот поток можно найти здесь .

0 голосов
/ 13 мая 2018

Возможно, вы упростили код для своего примера, но вы не можете просто отправить сообщение экземпляру класса Actor. Вам необходимо создать именованного актера, используя actorOf на ActorSystem, а затем использовать actorSelection в этой системе, чтобы получить указатель на актера.

Для второй части вашего вопроса, просто введите значение соответствия в case и отправьте результат:

case msg =>
  dlc ! msg

Кроме того, вы можете использовать другое имя для своего класса, потому что мертвая буква - это сообщение, которое не может быть доставлено , а не сообщение, которое может ' не может быть обработано получателем.

...