Акка - балансировка нагрузки и полноценное использование процессора - PullRequest
0 голосов
/ 06 декабря 2011

Я написал алгоритм умножения матриц, который использует параллельные коллекции, чтобы ускорить умножение.

Это выглядит так:

(0 until M1_ROWS).grouped(PARTITION_ROWS).toList.par.map( i => 
  singleThreadedMultiplicationFAST(i.toArray.map(m1(_)), m2) 
).reduce(_++_) 

Теперь я хотел бы сделать то же самое в Akka, поэтому я сделал следующее:

val multiplyer = actorOf[Pool] 
multiplyer start 
val futures = (0 until M1_ROWS).grouped(PARTITION_ROWS).map( i => 
  multiplyer ? MultiplyMatrix(i.toArray.map(m1(_)), m2) 
) 
futures.map(_.get match { case res :Array[Array[Double]] => res }).reduce(_++_) 

class Multiplyer extends akka.actor.Actor{ 
  protected def receive = { 
    case MultiplyMatrix(m1, m2) => self reply singleThreadedMultiplicationFAST (m1,m2) 
  } 
} 
class Pool extends Actor with DefaultActorPool 
  with FixedCapacityStrategy with RoundRobinSelector { 

  def receive = _route 
  def partialFill = false 
  def selectionCount = 1 
  def instance = actorOf[Multiplyer] 
  def limit = 32 // I tried 256 with no effect either 
} 

Оказалось, чтоверсия этого алгоритма, основанная на актерах, использует только 200% моего песчаного моста i7, в то время как версия параллельных коллекций использует 600% процессора и в 4-5 раз быстрее.Я подумал, что это может быть диспетчер и попробовал это:

self.dispatcher = Dispatchers.newThreadBasedDispatcher(self, mailboxCapacity = 100) 

и это (я поделился этим между актерами):

val messageDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("d1")
  .withNewBoundedThrea dPoolWithLinkedBlockingQueueWithUnboundedCapacity(100)
  .setCorePoolSize(16)
  .setMaxPoolSize(128)
  .setKeepAliveTimeInMillis(60000).build 

Но я не заметил никаких изменений.По-прежнему используется только 200% процессора, а алгоритм работает в 4-5 раз медленнее, чем версия с параллельными коллекциями.

Я уверен, что делаю что-то глупое, поэтому, пожалуйста, помогите !!! :)

1 Ответ

2 голосов
/ 06 декабря 2011

Это выражение:

val futures = (0 until M1_ROWS).grouped(PARTITION_ROWS).map( i => 
  multiplyer ? MultiplyMatrix(i.toArray.map(m1(_)), m2) 
) 

создает ленивую коллекцию, поэтому ваш _.get делает всю вашу программу последовательной.Поэтому решение состоит в том, чтобы сделать это выражение строгим, добавив toList или аналогичный.

...