таймер akka дает сообщение Отмена таймера [сообщение] с генерацией [583] - PullRequest
0 голосов
/ 15 февраля 2020

я использую таймер Акка

я использую потоков Twitter и я пытаюсь получить количество твитов в 5 секунд, вот мой код

case class PerSecond(tweet:Tweet)
case class TweetPerSecondCount(tweet:Tweet)

class TweetPerSecondActor extends Actor with Timers{
var counter=0
 def receive: PartialFunction[Any, Unit] = {
    case PerSecond(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message PerSecond")
      timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)

    case TweetPerSecondCount(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
      log.info("got the tweet {}",getCounter+"in 5 seconds")

    case message =>
      log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
      unhandled(message)
  }
}

в контроллере игровой платформы. Действие Я беру объекты твитов из потока твиттера (непрерывный поток, не наклоняя его)

    class Mycontroller extends Controller {

    val tweetPerSecondActor = system.actorOf......//create actor


    def tweetAveragePerSecond = Action {
        log.debug("in the action tweetAveragePerSecond")

        def getTweet: PartialFunction[StreamingMessage, Unit] = {
          case tweet: Tweet =>
            val future = ask(tweetPerSecondActor, PerSecond(tweet))
        }
        val streaming: Future[TwitterStream] = handleTwitterStreamClient.getStreamingCLient.sampleStatuses(stall_warnings = true)(getTweet)

        Ok("tweet average per second")
      }
    }

, когда я нажимаю на маршрут, журналы показывают

TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]

    6:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-6] TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
    16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
    16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]

и если я передаю фиктивные строковые значения вместо запуска потока и передачи объекта twitter, таймер работает нормально, как показано ниже: case case PerSecond (tweet: String) case class TweetPerSecondCount (tweet: String)

class TweetPerSecondActor extends Actor with Timers{
var counter=0
 def receive: PartialFunction[Any, Unit] = {
    case PerSecond(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message PerSecond")
      timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)

    case TweetPerSecondCount(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
      log.info("got the tweet {}",getCounter+"in 5 seconds")

    case message =>
      log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
      unhandled(message)
  }
}

    class Mycontroller extends Controller {

    val tweetPerSecondActor = system.actorOf......//create actor


    def tweetAveragePerSecond = Action {
        log.debug("in the action tweetAveragePerSecond")

            val future = ask(tweetPerSecondActor, PerSecond("dummy value"))


        Ok("tweet average per second")
      }
    }

1 Ответ

0 голосов
/ 15 февраля 2020

Таймеры в Akka имеют ключи (в вашем случае это ключ "perSecond"), и API гарантирует, что

Каждый таймер имеет ключ, и если новый таймер с тем же ключом запускается предыдущим отменяется

Так что каждый раз, когда вызывается функция getTweet (учитывая, что она вызывается для эффекта, а не для значения, я предпочитаю "процедуру", но это, возможно, особые c), таймер получит отменено.

В зависимости от того, что вы пытаетесь выполнить, sh решения могут включать:

  • Актер за запрос. Вам нужно, чтобы запрос возвращал что-то, что может быть передано будущим запросам, чтобы получить информацию от актера. Управление жизненным циклом актера также, вероятно, также будет желательным.
  • Сохраните одного актера на весь контроллер, но используйте уникальный ключ таймера для каждого запроса. Если это таймеры периодов c, вам придется отслеживать, какие ключи таймеров активны, и отменять те, которые больше не нужны (сохранение произвольного количества работающих таймеров периодов c может в конечном итоге ухудшить производительность)
...