Scala actor: receiveWithin () не получает сообщения - PullRequest
3 голосов
/ 11 октября 2011

Я создаю службу на основе актера в Scala, где потребители могут запрашивать, авторизованы ли клиенты, а также могут авторизовать клиентов.

Если потребитель запрашивает состояние авторизации клиента, а этот клиент - нетАвторизовавшись, актер должен дождаться входящих Authorize сообщений в течение указанного времени и затем отправить ответ.IsAuthorized должен иметь возможность выполняться синхронно в коде потребителя, чтобы он блокировал и ожидал ответа.Что-то вроде

service !? IsAuthorized(client) => {
  case IsAuthorizedResponse(_, authorized) => // do something
}

Однако receiveWithin() в моем актере никогда не получает сообщения и всегда сталкивается с таймаутом.

Вот мой код

case object WaitingForAuthorization
case class WaitingForAuthorizationResponse(clients: immutable.Set[Client])
case class IsAuthorized(client: Client)
case class IsAuthorizedResponse(client: Client, authorized: Boolean)
case class Authorize(client: Client)

class ClientAuthorizationService {
  private val authorized: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]
  private val waiting: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]

  def actor = Actor.actor {
    loop {
      react {
        case IsAuthorized(client: Client) => reply {
          if (authorized contains client) {
            IsAuthorizedResponse(client, true)
          } else {
            waiting += client
            var matched = false;
            val end = Instant.now.plus(ClientAuthorizationService.AUTH_TIMEOUT)

            while (!matched && Instant.now.isBefore(end)) {
              // ERROR HERE: Never receives Authorize messages
              receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
                case Authorize(authorizedClient: Client) => {
                  authorizeClient(authorizedClient)
                  if (authorizedClient == client) matched = true
                }
                case TIMEOUT => // do nothing since we handle the timeout in the while loop
              }
            }

            IsAuthorizedResponse(client, matched)
          }
        }

        case Authorize(client: Client) => authorizeClient(client)
        case WaitingForAuthorization => reply {
          WaitingForAuthorizationResponse(immutable.Set() ++ waiting)
        }
      }
    }
  }

  private def authorizeClient(client: Client) = synchronized {
    authorized += client
    waiting -= client
  }
}

object ClientAuthorizationService {
  val AUTH_TIMEOUT: Long = 60 * 1000;
}

Когда я 'm отправляя сообщение Authorize субъекту, пока он находится в блоке receiveWithin, сообщения перехватываются вторым оператором case ниже, который должен фактически перехватывать эти сообщения только тогда, когда никто не ждет ответа в это время.

Что не так с моим кодом?

Обновление:

Вот сокращенная версия соответствующего кода, которая на самом деле представляет собой гораздо более простую и отличающуюся логику, но, возможно, лучше проясняет проблему:

loop {
  react {
    case IsAuthorized(client: Client) => reply {
      var matched = false

      // In the "real" logic we would actually loop here until either the
      // authorized client matches the requested client or the timeout is hit.
      // For the sake of the demo we only take the first Authorize message.

      receiveWithin(60*1000) {
        // Although Authorize is send to actor it's never caught here
        case Authorize(authorizedClient: Client) => matched = authorizedClient == client
        case TIMEOUT => 
      }

      IsAuthorizedResponse(client, matched)
    }

    case Authorize(client: Client) => // this case is hit
  }
}

Обновление 2:

Я наконец решил проблему.Я думаю, что проблема заключалась в том, что актер блокировал при попытке получить сообщение Authorize в ответе на предыдущее сообщение IsAuthorized.

Я переписал код, чтобы анонимный Актер запускался, когда мы 'В ожидании Authorized.Вот код для тех, кому это интересно.waiting - это Map[Client, Actor].

loop {
  react {
    case IsAuthorized(client: Client) =>
      if (authorized contains client) {
        sender ! IsAuthorizedResponse(client, true)
      } else {
        val receipient = sender
        // Start an anonymous actor that waits for an Authorize message
        // within a given timeout and sends a reply to the consumer.
        // The actor will be notified by the parent actor below.
        waiting += client -> Actor.actor {
          val cleanup = () => {
            waiting -= client
            exit()
          }

          receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
            case Authorize(c) =>
              receipient ! IsAuthorizedResponse(client, true)
              cleanup()
            case TIMEOUT =>
              receipient ! IsAuthorizedResponse(client, false)
              cleanup()
          }
        }
      }

    case Authorize(client: Client) =>
      authorized += client

      waiting.get(client) match {
        case Some(actor) => actor ! Authorize(client)
        case None =>
      }

    case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet)
  }
}

Если есть более эффективные способы решения этой проблемы, пожалуйста, сообщите мне!

Ответы [ 2 ]

0 голосов
/ 11 октября 2011

Я наконец решил проблему.Я думаю, что проблема заключалась в том, что актер блокировал при попытке получить сообщение Authorize в ответе на предыдущее сообщение IsAuthorized.

Я переписал код так, чтобы анонимный Актер запускался, когдаВ ожидании Authorized.Вот код для тех, кому это интересно.waiting является Map[Client, Actor].

loop {
  react {
    case IsAuthorized(client: Client) =>
      if (authorized contains client) {
        sender ! IsAuthorizedResponse(client, true)
      } else {
        val receipient = sender
        // Start an anonymous actor that waits for an Authorize message
        // within a given timeout and sends a reply to the consumer.
        // The actor will be notified by the parent actor below.
        waiting += client -> Actor.actor {
          val cleanup = () => {
            waiting -= client
            exit()
          }

          receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
            case Authorize(c) =>
              receipient ! IsAuthorizedResponse(client, true)
              cleanup()
            case TIMEOUT =>
              receipient ! IsAuthorizedResponse(client, false)
              cleanup()
          }
        }
      }

    case Authorize(client: Client) =>
      authorized += client

      waiting.get(client) match {
        case Some(actor) => actor ! Authorize(client)
        case None =>
      }

    case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet)
  }
}

Если есть более эффективные способы решения этой проблемы, пожалуйста, сообщите мне!

0 голосов
/ 11 октября 2011

Не ответить на проблему? В

case IsAuthorized(client: Client) => reply { ... }

весь код находится в аргументе блока ответа, поэтому он выполняется (включая receiveWithing) до того, как ответ действительно будет отправлен. Это означает, что когда ваш клиент обработает ваш ответ, вы больше не будете ждать его.

В вашем исходном коде это должно быть что-то вроде

case IsAuthorized(client: Client) =>
  if(ok) reply(AuthorizedReply(client, true))
  else {
     reply(AuthorizedReply(client, false))
     receiveWithin(...)
  }
...