Положение
Я использую актеров akka для обновления данных на моем веб-клиенте. Один из этих участников несет полную ответственность за рассылку обновлений, касающихся одиночных Agent
с. Эти агенты обновляются очень быстро (каждые 10 мс). Моя цель сейчас состоит в том, чтобы регулировать этот механизм обновления, чтобы каждые 300 мс отправлялась самая новая версия каждого Agent
.
Мой код
Это то, что я до сих пор придумал:
/**
* Single agents are updated very rapidly. To limit the burden on the web-frontend, we throttle the messages here.
*/
class BroadcastSingleAgentActor extends Actor {
private implicit val ec: ExecutionContextExecutor = context.dispatcher
private var queue = Set[Agent]()
context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
queue.foreach { a =>
broadcastAgent(self)(a) // sends the message to all connected clients
}
queue = Set()
}
override def receive: Receive = {
// this message is received every 10 ms for every agent present
case BroadcastAgent(agent) =>
// only keep the newest version of the agent
queue = queue.filter(_.id != agent.id) + agent
}
}
Вопрос
Этот актер (BroadcastSingleAgentActor
) работает, как и ожидалось, но я не уверен на 100%, является ли он потокобезопасным (обновляя queue
, одновременно удаляя его). Кроме того, это не значит, что я лучше использую инструменты, которые предоставляет мне Акка. Я нашел эту статью (Throttling Messages в Akka 2), но моя проблема в том, что мне нужно сохранить самое новое сообщение Agent
при удалении любой его старой версии. Есть ли пример где-то похожий на то, что мне нужно?