Зависимость между операциями в scala актерах - PullRequest
1 голос
/ 17 марта 2010

Я пытаюсь распараллелить код с использованием акторов scala. Это мой первый настоящий код с актерами, но у меня есть некоторый опыт работы с многопоточностью Java и MPI в C. Однако я полностью потерян.

Рабочий процесс, который я хочу реализовать, представляет собой круговой конвейер и может быть описан следующим образом:

  • Каждый рабочий актер имеет ссылку на другого, образуя тем самым круг
  • Существует координатор субъект, который может запустить вычисление, отправив StartWork() сообщение
  • Когда рабочий получает сообщение StartWork(), он обрабатывает некоторые вещи локально и отправляет сообщение DoWork(...) своему соседу по кругу.
  • Соседи делают что-то еще и по очереди отправляют сообщение DoWork(...) своему соседу.
  • Это продолжается до тех пор, пока начальный работник не получит сообщение DoWork().
  • Координатор может отправить GetResult() сообщение первоначальному работнику и ждать ответа.

Дело в том, что координатор должен получать результат только тогда, когда данные готовы. Как работник может дождаться возвращения задания, прежде чем ответить на сообщение GetResult()?

Чтобы ускорить вычисления, любой работник может получить StartWork() в любое время.

Вот моя первая попытка псевдо-реализации работника:

class Worker( neighbor: Worker, numWorkers: Int ) {
   var ready = Foo()
   def act() {
     case StartWork() => { 
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       }
     case DoWork( resultData, remaining ) => if( remaining == 0 ) {
         ready = resultData
       } else {
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      }
    case GetResult() => reply( ready )
  }
}

На стороне координатора:

worker ! StartWork()
val result = worker !? GetResult() // should wait

Ответы [ 2 ]

3 голосов
/ 17 марта 2010

Во-первых, вам явно необходимо иметь какой-то идентификатор того, что составляет единое произведение, чтобы GetResult мог получить правильный результат. Полагаю, очевидное решение состоит в том, чтобы ваши актеры сохраняли Map результатов и Map всех ожидающих получателей :

class Worker( neighbor: Worker, numWorkers: Int ) {
   var res: Map[Long, Result] = Map.empty
   var gets: Map[Long, OutputChannel[Any]] = Map.empty   
   def act() {
     ...
     case DoWork( id, resultData, remaining ) if remaining == 0 =>
       res += (id -> resultData)
       gets.get(id).foreach(_ ! res(id)) //reply to getters when result is ready
       gets -= id //clear out getter map now?
     case GetResult(id) if res.isDefinedAt(d) => //result is ready
       reply (res(id))
     case GetResult(id) => //no result ready 
       gets += (id -> sender)
   }
}

Примечание: использование if в условии соответствия может сделать обработку сообщений более понятной

1 голос
/ 18 марта 2010

Одна альтернатива будет такой:

class Worker( neighbor: Worker, numWorkers: Int ) {
   var ready = Foo()
   def act() {
     case StartWork() => { 
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       }
     case DoWork( resultData, remaining ) => if( remaining == 0 ) {
         ready = resultData
         react {
           case GetResult() => reply( ready )
         }
       } else {
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      }
  }
}

После завершения работы этот работник застрянет, пока не получит сообщение GetResult. С другой стороны, координатор может немедленно отправить GetResult, так как он останется в почтовом ящике, пока работник не получит его.

...