Реагируйте на фьючерсы - PullRequest
7 голосов
/ 04 октября 2010

Я пытаюсь использовать подход «разделяй и властвуй» (он же форк / соединение) для решения проблемы перебора чисел. Вот код:

import scala.actors.Futures.future

private def compute( input: Input ):Result = {
  if( pairs.size < SIZE_LIMIT ) {
    computeSequential()
  } else {
    val (input1,input2) = input.split
    val f1 = future( compute(input1) )
    val f2 = future( compute(input2) )
    val result1 = f1()
    val result2 = f2()
    merge(result1,result2)
  }
}

Он работает (с хорошим ускорением), но метод применения в будущем, кажется, блокирует поток, и пул потоков значительно увеличивается. И когда создается слишком много потоков, вычисления застряли.

Существует ли своего рода метод реаги для фьючерсов, который освобождает поток? Или любой другой способ добиться такого поведения?

РЕДАКТИРОВАТЬ: Я использую Scala 2.8.0.final

1 Ответ

8 голосов
/ 04 октября 2010

Не требуйте (не применяйте) свои Future s, так как это заставляет их блокировать и ждать ответа; как вы видели, это может привести к тупикам. Вместо этого используйте их монадически, чтобы сказать им, что делать, когда они завершат. Вместо:

val result1 = f1()
val result2 = f2()
merge(result1,result2)

Попробуйте это:

for {
  result1 <- f1
  result2 <- f2
} yield merge(result1, result2)

Результатом этого будет Responder[Result] (по существу, Future[Result]), содержащий объединенные результаты; вы можете сделать что-то эффективное с этим последним значением, используя respond() или foreach(), или вы можете map() или flatMap() сделать это с другим Responder[T]. Блокировка не требуется, просто продолжайте планировать вычисления на будущее!

Редактировать 1:

Хорошо, подпись функции compute теперь должна измениться на Responder[Result], так как это влияет на рекурсивные вызовы? Давайте попробуем это:

private def compute( input: Input ):Responder[Result] = {
  if( pairs.size < SIZE_LIMIT ) {
    future(computeSequential())
  } else {
    val (input1,input2) = input.split
    for {
      result1 <- compute(input1)
      result2 <- compute(input2)
    } yield merge(result1, result2)
  }
}

Теперь вам больше не нужно заключать вызовы в compute с помощью future(...), потому что они уже возвращают Responder (суперкласс Future).

Редактировать 2:

Одним из результатов использования этого стиля прохождения продолжения является то, что ваш код верхнего уровня - как бы он ни вызывал compute изначально - больше не блокируется. Если он вызывается из main(), и это все, что делает программа, это будет проблемой, потому что теперь он просто создаст кучу фьючерсов, а затем сразу же закроется, завершив все, что было сказано. Вам нужно block на всех этих фьючерсах, но только один раз, на верхнем уровне и только на результатах всех вычислений, а не промежуточных.

К сожалению, эта Responder вещь, возвращаемая compute(), больше не имеет блокирующего apply() метода, как Future. Я не уверен, почему flatMapping Future s генерирует общий Responder вместо Future; это похоже на ошибку API. Но, в любом случае, вы сможете сделать свое собственное:

def claim[A](r:Responder[A]):A = {
  import java.util.concurrent.ArrayBlockingQueue
  import scala.actors.Actor.actor

  val q = new ArrayBlockingQueue[A](1)
  // uses of 'respond' need to be wrapped in an actor or future block
  actor { r.respond(a => q.put(a)) } 
  return q.take
}

Так что теперь вы можете создать блокирующий вызов для вычисления в вашем main методе так:

val finalResult = claim(compute(input))
...