Регулирующий актер, который хранит только самые новые сообщения - PullRequest
2 голосов
/ 26 марта 2019

Положение

Я использую актеров akka для обновления данных на моем веб-клиенте. Один из этих участников несет полную ответственность за рассылку обновлений, касающихся одиночных Agent с. Эти агенты обновляются очень быстро (каждые 10 мс). Моя цель сейчас состоит в том, чтобы регулировать этот механизм обновления, чтобы каждые 300 мс отправлялась самая новая версия каждого Agent.

Мой код

Это то, что я до сих пор придумал:

/**
  * Single agents are updated very rapidly. To limit the burden on the web-frontend, we throttle the messages here.
  */
class BroadcastSingleAgentActor extends Actor {

    private implicit val ec: ExecutionContextExecutor = context.dispatcher
    private var queue = Set[Agent]()

    context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
        queue.foreach { a =>
            broadcastAgent(self)(a) // sends the message to all connected clients
        }
        queue = Set()
    }

    override def receive: Receive = {
        // this message is received every 10 ms for every agent present
        case BroadcastAgent(agent) => 
           // only keep the newest version of the agent
           queue = queue.filter(_.id != agent.id) + agent
    }

}

Вопрос

Этот актер (BroadcastSingleAgentActor) работает, как и ожидалось, но я не уверен на 100%, является ли он потокобезопасным (обновляя queue, одновременно удаляя его). Кроме того, это не значит, что я лучше использую инструменты, которые предоставляет мне Акка. Я нашел эту статью (Throttling Messages в Akka 2), но моя проблема в том, что мне нужно сохранить самое новое сообщение Agent при удалении любой его старой версии. Есть ли пример где-то похожий на то, что мне нужно?

1 Ответ

1 голос
/ 26 марта 2019

Нет, это не потокобезопасно, потому что планирование через ActorSystem будет происходить в другом потоке, а не receive.Одна потенциальная идея состоит в том, чтобы выполнить планирование в методе receive, потому что входящие сообщения в BroadcastSingleAgentActor будут обрабатываться последовательно.

  override def receive: Receive = {

    case Refresh =>
      context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
        queue.foreach { a =>
          broadcastAgent(self)(a) // sends the message to all connected clients
        }
      }
      queue = Set()
    // this message is received every 10 ms for every agent present
    case BroadcastAgent(agent) =>
      // only keep the newest version of the agent
      queue = queue.filter(_.id != agent.id) + agent
  }
...