Акка: Хранить непревзойденные сообщения в почтовом ящике - PullRequest
0 голосов
/ 19 декабря 2018

Я знаком с Erlang / Elixir, в котором сообщения, находящиеся в почтовом ящике процесса, остаются в почтовом ящике до тех пор, пока они не будут сопоставлены:

Шаблоны Pattern последовательно сопоставляются спервое сообщение по времени в почтовом ящике, затем второе и так далее.Если совпадение выполнено успешно и необязательная защитная последовательность GuardSeq имеет значение true, соответствующий Body оцениваетсяСоответствующее сообщение используется, то есть удаляется из почтового ящика, в то время как любые другие сообщения в почтовом ящике остаются неизменными.

(http://erlang.org/doc/reference_manual/expressions.html#receive)

Однако с несоответствующими сообщениями Akka Actorsудаляются из почтового ящика. Это раздражает при внедрении, например, вилок в симуляцию столовых философов:

import akka.actor._

object Fork {
  def props(id: Int): Props = Props(new Fork(id))
  final case class Take(philosopher: Int)
  final case class Release(philosopher: Int)
  final case class TookFork(fork: Int)
  final case class ReleasedFork(fork: Int)
}

class Fork(val id: Int) extends Actor {
  import Fork._

  object Status extends Enumeration {
    val FREE, TAKEN = Value
  }

  private var _status: Status.Value = Status.FREE
  private var _held_by: Int = -1

  def receive = {
    case Take(philosopher) if _status == Status.FREE => {
      println(s"\tPhilosopher $philosopher takes fork $id.")
      take(philosopher)
      sender() ! TookFork(id)
      context.become(taken, false)
    }
    case Release(philosopher) if _status == Status.TAKEN && _held_by == philosopher => {
      println(s"\tPhilosopher $philosopher puts down fork $id.")
      release()
      sender() ! ReleasedFork(id)
      context.unbecome()
    }
  }

  def take(philosopher: Int) = {
    _status  = Status.TAKEN
    _held_by = philosopher
  }

  def release() = {
    _status  = Status.FREE
    _held_by = -1
  }
}

Когда на вилку отправляется сообщение Take(<philosopher>), мы хотим, чтобы оно оставалось в почтовом ящике.до тех пор, пока разветвление не будет освобождено и сообщение не будет найдено. Однако в Akka Take(<philosopher>) сообщения удаляются из почтового ящика, если разветвление в настоящее время занято, поскольку совпадения не найдено.

В настоящее время я решаю эту проблему с помощьюпереопределяя метод unhandled актера Fork и снова перенаправляя сообщение на вилку:

override def unhandled(message: Any): Unit = {
  self forward message
}

Я считаю, что это ужасно неэффективно, поскольку оно продолжает посылать сообщение на вилку, пока оно не будет найдено.Еще один способ решить эту проблему, который не включает в себя непрерывную пересылку непревзойденных сообщений?

В этом наихудшем случае мне придется реализовать собственный тип почтового ящика, который имитирует почтовые ящики Erlang, как описано здесь: http://ndpar.blogspot.com/2010/11/erlang-explained-selective-receive.html


РЕДАКТИРОВАТЬ: я изменил свою реализацию на основе совета Тима и использую черту Stashкак предложено.Мой Fork актер теперь выглядит следующим образом:

class Fork(val id: Int) extends Actor with Stash {
  import Fork._

  // Fork is in "taken" state
  def taken(philosopher: Int): Receive = {
    case Release(`philosopher`) => {
      println(s"\tPhilosopher $philosopher puts down fork $id.")
      sender() ! ReleasedFork(id)
      unstashAll()
      context.unbecome()
    }
    case Take(_) => stash()
  }

  // Fork is in "free" state
  def receive = {
    case Take(philosopher) => {
      println(s"\tPhilosopher $philosopher takes fork $id.")
      sender() ! TookFork(id)
      context.become(taken(philosopher), false)
    }
  }
}

Однако я не хочу писать вызовы stash() и unstashAll() везде.Вместо этого я хочу реализовать пользовательский тип почтового ящика, который делает это для меня, то есть хранит необработанные сообщения и снимает их, когда актер обрабатывает сообщение.Возможно ли это?

Я попытался создать собственный почтовый ящик, который делает это, однако я не могу определить, соответствует ли сообщение блоку приема или не соответствует ему.

Ответы [ 2 ]

0 голосов
/ 19 декабря 2018

Существует пример проекта в репозитории Akka, в котором размещено несколько реализаций "Обедающих философов" .Ключевое различие между вашим подходом и их подходом состоит в том, что они применяют и посуду, и философов как актеров, тогда как вы определяете только посуду как актера.В примерах реализации показано, как смоделировать проблему, не обрабатывая необработанные сообщения и не используя собственный почтовый ящик.

0 голосов
/ 19 декабря 2018

Проблема с forward состоит в том, что он может переупорядочивать сообщения, если есть несколько сообщений, ожидающих обработки, что, вероятно, не очень хорошая идея.

Наилучшим решением здесь может показатьсяреализовать собственную очередь внутри актера, которая дает семантику, которую вы хотите.Если вы не можете обработать сообщение немедленно, поместите его в очередь, а когда приходит следующее сообщение, вы можете обработать как можно большую часть очереди.Это также позволит вам определить, когда отправители отправляют несогласованные сообщения (например, Release на развилке, которую они не Take), которая в противном случае будет просто накапливаться во входящем почтовом ящике.

Я не буду беспокоиться одо тех пор, пока вы не докажете, что это проблема, но она будет более эффективной, если каждая функция приема обрабатывает только те сообщения, которые имеют отношение к этому конкретному состоянию.


Я бы не использовал var вактер, помещая состояние в параметрах в методы receive.И значение _status подразумевается в выборе обработчика получения и не нуждается в сохранении в качестве значения.Обработчик приема taken должен обрабатывать только Release сообщений, а основной обработчик приема - только Take сообщений.

...