Как я могу получить первого законченного Актера в группе Актеров в Scala? - PullRequest
4 голосов
/ 03 ноября 2011

У меня умеренное количество длительно действующих актеров, и я хочу написать синхронную функцию, которая возвращает первую из них, которая завершается. Я могу сделать это с ожиданием вращения на фьючерсах (например,:

while (! fs.exists(f => f.isSet) ) {
  Thread.sleep(100)
}
val completeds = fs.filter(f => f.isSet)
completeds.head()

), но это кажется "неактерным"

Класс scala.actors.Futures имеет два метода awaitAll() и awaitEither(), которые кажутся ужасно близкими; если бы был awaitAny(), я бы запрыгнул на него. Я упускаю простой способ сделать это или есть общий шаблон, который применим?

Ответы [ 2 ]

2 голосов
/ 03 ноября 2011

Более «актёрский» способ ожидания завершения - создание актера, отвечающего за обработку завершенного результата (назовем его ResultHandler)

Вместо ответа рабочие отправляют свой ответ на ResultHandler вманера огня и забывания.Последний продолжит обработку результата, пока другие работники завершат свою работу.

0 голосов
/ 03 ноября 2011

Ключом для меня стало обнаружение того, что каждый (?) Объект Scala неявно является актером, поэтому вы можете использовать Actor.react{ } для блокировки.Вот мой исходный код:

    import scala.actors._
    import scala.actors.Actor._

    //Top-level class that wants to return the first-completed result from some long-running actors
    class ConcurrentQuerier() {
        //Synchronous function; perhaps fulfilling some legacy interface
        def synchronousQuery : String = {
            //Instantiate and start the monitoring Actor
            val progressReporter = new ProgressReporter(self) //All (?) objects are Actors 
            progressReporter.start()

            //Instantiate the long-running Actors, giving each a handle to the monitor
            val lrfs = List ( 
                new LongRunningFunction(0, 2000, progressReporter), new LongRunningFunction(1, 2500, progressReporter), new LongRunningFunction(3, 1500, progressReporter), 
                new LongRunningFunction(4, 1495, progressReporter), new LongRunningFunction(5, 1500, progressReporter), new LongRunningFunction(6, 5000, progressReporter) ) 

            //Start 'em
            lrfs.map{ lrf => 
                lrf.start()
            }
            println("All actors started...")

            val start = System.currentTimeMillis()
            /* 
            This blocks until it receives a String in the Inbox.
            Who sends the string? A: the progressReporter, which is monitoring the LongRunningFunctions
            */ 
            val s = receive {
                  case s:String => s
            }
            println("Received " + s + " after " + (System.currentTimeMillis() - start) + " ms")
            s
        }
    }

    /* 
    An Actor that reacts to a message that is a tuple ("COMPLETED", someResult) and sends the
    result to this Actor's owner. Not strictly necessary (the LongRunningFunctions could post
    directly to the owner's mailbox), but I like the idea that monitoring is important enough
    to deserve its own object
*/
    class ProgressReporter(val owner : Actor) extends Actor {
        def act() = {
            println("progressReporter awaiting news...")
            react {
                case ("COMPLETED", s) => 
                    println("progressReporter received a completed signal " + s);
                    owner ! s
                case s => 
                    println("Unexpected message: " + s ); act()
            }
        }
    }

/*
    Some long running function
*/

    class LongRunningFunction(val id : Int, val timeout : Int, val supervisor : Actor) extends Actor {
        def act() = {
            //Do the long-running query
            val s = longRunningQuery()
            println(id.toString + " finished, sending results")
            //Send the results back to the monitoring Actor (the progressReporter)
            supervisor ! ("COMPLETED", s)
        }

        def longRunningQuery() : String = { 
            println("Starting Agent " + id + " with timeout " + timeout)
            Thread.sleep(timeout)
            "Query result from agent " + id
        }
    }


    val cq = new ConcurrentQuerier()
    //I don't think the Actor semantics guarantee that the result is absolutely, positively the first to have posted the "COMPLETED" message
    println("Among the first to finish was : " + cq.synchronousQuery)

Типичные результаты выглядят так:

scala ActorsNoSpin.scala 
progressReporter awaiting news...
All actors started...
Starting Agent 1 with timeout 2500
Starting Agent 5 with timeout 1500
Starting Agent 3 with timeout 1500
Starting Agent 4 with timeout 1495
Starting Agent 6 with timeout 5000
Starting Agent 0 with timeout 2000
4 finished, sending results
progressReporter received a completed signal Query result from agent 4
Received Query result from agent 4 after 1499 ms
Among the first to finish was : Query result from agent 4
5 finished, sending results
3 finished, sending results 
0 finished, sending results
1 finished, sending results
6 finished, sending results
...