Я создаю службу на основе актера в Scala, где потребители могут запрашивать, авторизованы ли клиенты, а также могут авторизовать клиентов.
Если потребитель запрашивает состояние авторизации клиента, а этот клиент - нетАвторизовавшись, актер должен дождаться входящих Authorize
сообщений в течение указанного времени и затем отправить ответ.IsAuthorized
должен иметь возможность выполняться синхронно в коде потребителя, чтобы он блокировал и ожидал ответа.Что-то вроде
service !? IsAuthorized(client) => {
case IsAuthorizedResponse(_, authorized) => // do something
}
Однако receiveWithin()
в моем актере никогда не получает сообщения и всегда сталкивается с таймаутом.
Вот мой код
case object WaitingForAuthorization
case class WaitingForAuthorizationResponse(clients: immutable.Set[Client])
case class IsAuthorized(client: Client)
case class IsAuthorizedResponse(client: Client, authorized: Boolean)
case class Authorize(client: Client)
class ClientAuthorizationService {
private val authorized: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]
private val waiting: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]
def actor = Actor.actor {
loop {
react {
case IsAuthorized(client: Client) => reply {
if (authorized contains client) {
IsAuthorizedResponse(client, true)
} else {
waiting += client
var matched = false;
val end = Instant.now.plus(ClientAuthorizationService.AUTH_TIMEOUT)
while (!matched && Instant.now.isBefore(end)) {
// ERROR HERE: Never receives Authorize messages
receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
case Authorize(authorizedClient: Client) => {
authorizeClient(authorizedClient)
if (authorizedClient == client) matched = true
}
case TIMEOUT => // do nothing since we handle the timeout in the while loop
}
}
IsAuthorizedResponse(client, matched)
}
}
case Authorize(client: Client) => authorizeClient(client)
case WaitingForAuthorization => reply {
WaitingForAuthorizationResponse(immutable.Set() ++ waiting)
}
}
}
}
private def authorizeClient(client: Client) = synchronized {
authorized += client
waiting -= client
}
}
object ClientAuthorizationService {
val AUTH_TIMEOUT: Long = 60 * 1000;
}
Когда я 'm отправляя сообщение Authorize
субъекту, пока он находится в блоке receiveWithin, сообщения перехватываются вторым оператором case ниже, который должен фактически перехватывать эти сообщения только тогда, когда никто не ждет ответа в это время.
Что не так с моим кодом?
Обновление:
Вот сокращенная версия соответствующего кода, которая на самом деле представляет собой гораздо более простую и отличающуюся логику, но, возможно, лучше проясняет проблему:
loop {
react {
case IsAuthorized(client: Client) => reply {
var matched = false
// In the "real" logic we would actually loop here until either the
// authorized client matches the requested client or the timeout is hit.
// For the sake of the demo we only take the first Authorize message.
receiveWithin(60*1000) {
// Although Authorize is send to actor it's never caught here
case Authorize(authorizedClient: Client) => matched = authorizedClient == client
case TIMEOUT =>
}
IsAuthorizedResponse(client, matched)
}
case Authorize(client: Client) => // this case is hit
}
}
Обновление 2:
Я наконец решил проблему.Я думаю, что проблема заключалась в том, что актер блокировал при попытке получить сообщение Authorize
в ответе на предыдущее сообщение IsAuthorized
.
Я переписал код, чтобы анонимный Актер запускался, когда мы 'В ожидании Authorized
.Вот код для тех, кому это интересно.waiting
- это Map[Client, Actor]
.
loop {
react {
case IsAuthorized(client: Client) =>
if (authorized contains client) {
sender ! IsAuthorizedResponse(client, true)
} else {
val receipient = sender
// Start an anonymous actor that waits for an Authorize message
// within a given timeout and sends a reply to the consumer.
// The actor will be notified by the parent actor below.
waiting += client -> Actor.actor {
val cleanup = () => {
waiting -= client
exit()
}
receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
case Authorize(c) =>
receipient ! IsAuthorizedResponse(client, true)
cleanup()
case TIMEOUT =>
receipient ! IsAuthorizedResponse(client, false)
cleanup()
}
}
}
case Authorize(client: Client) =>
authorized += client
waiting.get(client) match {
case Some(actor) => actor ! Authorize(client)
case None =>
}
case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet)
}
}
Если есть более эффективные способы решения этой проблемы, пожалуйста, сообщите мне!