Спящие актеры? - PullRequest
       30

Спящие актеры?

16 голосов
/ 04 августа 2009

Как лучше всего спать актеру? У меня есть актеры, настроенные как агенты, которые хотят поддерживать разные части базы данных (включая получение данных из внешних источников). По ряду причин (включая отсутствие перегрузки базы данных или связи и общие проблемы с загрузкой) я хочу, чтобы актеры спали между каждой операцией. Я смотрю на что-то вроде 10 актерских объектов.

Актеры будут работать в значительной степени бесконечно, так как всегда будут поступать новые данные или сидеть в таблице, ожидая распространения в другие части базы данных и т. Д. Идея состоит в том, чтобы база данных была максимально полной в любой момент времени.

Я мог бы сделать это с бесконечным циклом и сном в конце каждого цикла, но согласно http://www.scala -lang.org / node / 242 актеры используют пул потоков, который расширяется всякий раз, когда все потоки заблокированы. Так что я думаю, что Thread.sleep в каждом действующем субъекте был бы плохой идеей, так как излишне тратил бы нити.

Возможно, у меня может быть центральный субъект с собственным циклом, который отправляет сообщения подписчикам на часах (например, наблюдатели асинхронных событий)?

Кто-нибудь делал что-нибудь подобное или есть предложения? Извините за дополнительную (возможно, лишнюю) информацию.

Приветствия

Джо

Ответы [ 4 ]

21 голосов
/ 04 августа 2009

В первом ответе был Эрланг, но он, похоже, исчез. Вы можете легко сделать тот же трюк в стиле Эрланга с актерами Scala. Например. давайте создадим планировщик, который не использует потоки:

import actors.{Actor,TIMEOUT}

def scheduler(time: Long)(f: => Unit) = {
  def fixedRateLoop {
    Actor.reactWithin(time) {
      case TIMEOUT => f; fixedRateLoop
      case 'stop => 
    }
  }
  Actor.actor(fixedRateLoop)
}

И давайте проверим это (я сделал это правильно в Scala REPL), используя актера клиента теста:

case class Ping(t: Long)

import Actor._
val test = actor { loop {
  receiveWithin(3000) {
    case Ping(t) => println(t/1000)
    case TIMEOUT => println("TIMEOUT")
    case 'stop => exit
  }
} }

Запустить планировщик:

import compat.Platform.currentTime
val sched = scheduler(2000) { test ! Ping(currentTime) }

и вы увидите что-то вроде этого

scala> 1249383399
1249383401
1249383403
1249383405
1249383407

, что означает, что наш планировщик отправляет сообщение каждые 2 секунды, как и ожидалось. Остановим планировщик:

sched ! 'stop

тестовый клиент начнет сообщать о таймаутах:

scala> TIMEOUT
TIMEOUT
TIMEOUT

остановите это также:

test ! 'stop
17 голосов
/ 04 августа 2009

Нет необходимости явно вызывать актера в спящий режим: использование loop и react для каждого актора означает, что базовый пул потоков будет иметь ожидающие потоки, в то время как для актеров нет сообщений для обработки.

В случае, если вы хотите запланировать события для ваших актеров для обработки, это довольно легко, используя однопоточный планировщик из утилит java.util.concurrent:

object Scheduler {
  import java.util.concurrent.Executors
  import scala.compat.Platform
  import java.util.concurrent.TimeUnit
  private lazy val sched = Executors.newSingleThreadScheduledExecutor();
  def schedule(f: => Unit, time: Long) {
    sched.schedule(new Runnable {
      def run = f
    }, time , TimeUnit.MILLISECONDS);
  }
}

Вы можете расширить это, чтобы выполнять периодические задачи, и это можно использовать так:

val execTime = //...  
Scheduler.schedule( { Actor.actor { target ! message }; () }, execTime)

Тогда вашему целевому субъекту просто необходимо реализовать соответствующий цикл react для обработки данного сообщения. Вам не нужно спать с актером.

4 голосов
/ 07 августа 2009

ActorPing (Лицензия Apache) от lift-util имеет расписание и расписание. Источник: AttFixedRate: ActorPing.scala

Из скаладока:

Объект ActorPing планирует, что актер отправляет ping-сообщение с заданным сообщением через определенные промежутки времени. Методы расписания возвращают объект ScheduledFuture, который при необходимости можно отменить

2 голосов
/ 12 ноября 2011

К сожалению, в ответе oxbow_lakes есть две ошибки.

Одна - это простая ошибка объявления (долгое время против времени: длинная), но вторая более тонкая.

oxbow_lakes объявляет запуск как

def run = actors.Scheduler.execute(f) 

Это, однако, приводит к тому, что сообщения время от времени исчезают. То есть: они запланированы, но никогда не отправляются. Объявить пробег как

def run = f

исправил это для меня. Это сделано точно в ActorPing из lift-util.

Весь код планировщика становится:

object Scheduler {
    private lazy val sched = Executors.newSingleThreadedScheduledExecutor();
    def schedule(f: => Unit, time: Long) {
        sched.schedule(new Runnable {
          def run = f
        }, time - Platform.currentTime, TimeUnit.MILLISECONDS);
    }
}

Я пытался отредактировать сообщение oxbow_lakes, но не смог сохранить его (не работает?), Пока у меня нет прав на комментарии. Поэтому новый пост.

...