Как отменить актер Акка? - PullRequest
       7

Как отменить актер Акка?

3 голосов
/ 14 сентября 2011

У меня есть актер (работник), который получает запрос и отвечает на него.Обработка запроса может занять 3-60 минут.Абонент (также актер) в настоящее время использует !!!и ожидая future.get, однако дизайн актера Caller может быть изменен при необходимости.Кроме того, в настоящее время я использую диспетчер EventDriven.

Как отменить (инициированный пользователем) обработку запроса, чтобы рабочий субъект был освобожден и вернулся в состояние готовности для получения новых запросов?Я надеялся на метод, подобный методу отмены java.util.concurrent.Future, но не смог найти в Akka 1.1.3

Редактировать:

Мы пыталисьполучить поведение, которое мы ищем с помощью completeWithException:

object Cancel {
  def main(args: Array[String]) {
    val actor = Actor.actorOf[CancelActor].start
    EventHandler.info(this, "Getting future")
    val future = (actor ? "request").onComplete(x => EventHandler.info(this, "Completed!! " + x.get))
    Thread.sleep(500L)
    EventHandler.info(this, "Cancelling")
    future.completeWithException(new Exception("cancel"))
    EventHandler.info(this, "Future is " + future.get)
  }
}

class CancelActor extends Actor {
  def receive = {
    case "request" =>
      EventHandler.info(this, "start")
      (1 to 5).foreach(x => {
        EventHandler.info(this, "I am a long running process")
        Thread.sleep(200L)
      })
      self reply "response"
      EventHandler.info(this, "stop")
  }
}

Но это не остановило длительный процесс.

    [INFO]    [9/16/11 1:46 PM] [main] [Cancel$] Getting future
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] start
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [main] [Cancel$] Cancelling
    [ERROR]   [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-7] [ActorCompletableFuture] 
    java.lang.Exception: cancel
        at kozo.experimental.Cancel$.main(Cancel.scala:15)
...

    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] stop

Напротив, рассмотрим поведение java.util.concurrent.Future:

object Cancel2 {
  def main(args: Array[String]) {
    val executor: ExecutorService = Executors.newSingleThreadExecutor()
    EventHandler.info(this, "Getting future")
    val future = executor.submit(new Runnable {
      def run() {
        EventHandler.info(this, "start")
        (1 to 5).foreach(x => {
          EventHandler.info(this, "I am a long running process")
          Thread.sleep(200L)
        })
      }
    })
    Thread.sleep(500L)
    EventHandler.info(this, "Cancelling")
    future.cancel(true)
    EventHandler.info(this, "Future is " + future.get)
  }
}

, который останавливает длительный процесс

    [INFO]    [9/16/11 1:48 PM] [main] [Cancel2$] Getting future
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] start
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    Exception in thread "main" java.util.concurrent.CancellationException
...
    [INFO]    [9/16/11 1:48 PM] [main] [Cancel2$] Cancelling

Ответы [ 2 ]

2 голосов
/ 17 сентября 2011

Вы также можете проверить статус будущего в актере.

class MyActor extends Actor {
  def receive = {
    case msg =>
      while(!self.senderFuture.get.isCompleted) {
        performWork(msg)
      }
      self reply result
  }
  ...
}

Для этого необходимо отправить сообщение с '?' или «спроси» хотя. Надеюсь, поможет.

1 голос
/ 15 сентября 2011

Если вы просто в VM, вы можете просто передать AtomicBoolean с вашим сообщением Job и периодически проверять его в вашем актере, чтобы увидеть, следует ли вам прервать его.

actor ! Job(..., someAtomicBoolean)

class MyActor extends Actor {
  def receive = {
    case Job(..., cancelPlease) =>
      while(cancelPlease.get == false) {
        performWork
      }
      self reply result
  }
}
...