Я знаком с Erlang / Elixir, в котором сообщения, находящиеся в почтовом ящике процесса, остаются в почтовом ящике до тех пор, пока они не будут сопоставлены:
Шаблоны Pattern
последовательно сопоставляются спервое сообщение по времени в почтовом ящике, затем второе и так далее.Если совпадение выполнено успешно и необязательная защитная последовательность GuardSeq
имеет значение true, соответствующий Body
оцениваетсяСоответствующее сообщение используется, то есть удаляется из почтового ящика, в то время как любые другие сообщения в почтовом ящике остаются неизменными.
(http://erlang.org/doc/reference_manual/expressions.html#receive)
Однако с несоответствующими сообщениями Akka Actorsудаляются из почтового ящика. Это раздражает при внедрении, например, вилок в симуляцию столовых философов:
import akka.actor._
object Fork {
def props(id: Int): Props = Props(new Fork(id))
final case class Take(philosopher: Int)
final case class Release(philosopher: Int)
final case class TookFork(fork: Int)
final case class ReleasedFork(fork: Int)
}
class Fork(val id: Int) extends Actor {
import Fork._
object Status extends Enumeration {
val FREE, TAKEN = Value
}
private var _status: Status.Value = Status.FREE
private var _held_by: Int = -1
def receive = {
case Take(philosopher) if _status == Status.FREE => {
println(s"\tPhilosopher $philosopher takes fork $id.")
take(philosopher)
sender() ! TookFork(id)
context.become(taken, false)
}
case Release(philosopher) if _status == Status.TAKEN && _held_by == philosopher => {
println(s"\tPhilosopher $philosopher puts down fork $id.")
release()
sender() ! ReleasedFork(id)
context.unbecome()
}
}
def take(philosopher: Int) = {
_status = Status.TAKEN
_held_by = philosopher
}
def release() = {
_status = Status.FREE
_held_by = -1
}
}
Когда на вилку отправляется сообщение Take(<philosopher>)
, мы хотим, чтобы оно оставалось в почтовом ящике.до тех пор, пока разветвление не будет освобождено и сообщение не будет найдено. Однако в Akka Take(<philosopher>)
сообщения удаляются из почтового ящика, если разветвление в настоящее время занято, поскольку совпадения не найдено.
В настоящее время я решаю эту проблему с помощьюпереопределяя метод unhandled
актера Fork и снова перенаправляя сообщение на вилку:
override def unhandled(message: Any): Unit = {
self forward message
}
Я считаю, что это ужасно неэффективно, поскольку оно продолжает посылать сообщение на вилку, пока оно не будет найдено.Еще один способ решить эту проблему, который не включает в себя непрерывную пересылку непревзойденных сообщений?
В этом наихудшем случае мне придется реализовать собственный тип почтового ящика, который имитирует почтовые ящики Erlang, как описано здесь: http://ndpar.blogspot.com/2010/11/erlang-explained-selective-receive.html
РЕДАКТИРОВАТЬ: я изменил свою реализацию на основе совета Тима и использую черту Stashкак предложено.Мой Fork
актер теперь выглядит следующим образом:
class Fork(val id: Int) extends Actor with Stash {
import Fork._
// Fork is in "taken" state
def taken(philosopher: Int): Receive = {
case Release(`philosopher`) => {
println(s"\tPhilosopher $philosopher puts down fork $id.")
sender() ! ReleasedFork(id)
unstashAll()
context.unbecome()
}
case Take(_) => stash()
}
// Fork is in "free" state
def receive = {
case Take(philosopher) => {
println(s"\tPhilosopher $philosopher takes fork $id.")
sender() ! TookFork(id)
context.become(taken(philosopher), false)
}
}
}
Однако я не хочу писать вызовы stash()
и unstashAll()
везде.Вместо этого я хочу реализовать пользовательский тип почтового ящика, который делает это для меня, то есть хранит необработанные сообщения и снимает их, когда актер обрабатывает сообщение.Возможно ли это?
Я попытался создать собственный почтовый ящик, который делает это, однако я не могу определить, соответствует ли сообщение блоку приема или не соответствует ему.