Не требуйте (не применяйте) свои Future
s, так как это заставляет их блокировать и ждать ответа; как вы видели, это может привести к тупикам. Вместо этого используйте их монадически, чтобы сказать им, что делать, когда они завершат. Вместо:
val result1 = f1()
val result2 = f2()
merge(result1,result2)
Попробуйте это:
for {
result1 <- f1
result2 <- f2
} yield merge(result1, result2)
Результатом этого будет Responder[Result]
(по существу, Future[Result]
), содержащий объединенные результаты; вы можете сделать что-то эффективное с этим последним значением, используя respond()
или foreach()
, или вы можете map()
или flatMap()
сделать это с другим Responder[T]
. Блокировка не требуется, просто продолжайте планировать вычисления на будущее!
Редактировать 1:
Хорошо, подпись функции compute
теперь должна измениться на Responder[Result]
, так как это влияет на рекурсивные вызовы? Давайте попробуем это:
private def compute( input: Input ):Responder[Result] = {
if( pairs.size < SIZE_LIMIT ) {
future(computeSequential())
} else {
val (input1,input2) = input.split
for {
result1 <- compute(input1)
result2 <- compute(input2)
} yield merge(result1, result2)
}
}
Теперь вам больше не нужно заключать вызовы в compute
с помощью future(...)
, потому что они уже возвращают Responder
(суперкласс Future
).
Редактировать 2:
Одним из результатов использования этого стиля прохождения продолжения является то, что ваш код верхнего уровня - как бы он ни вызывал compute
изначально - больше не блокируется. Если он вызывается из main()
, и это все, что делает программа, это будет проблемой, потому что теперь он просто создаст кучу фьючерсов, а затем сразу же закроется, завершив все, что было сказано. Вам нужно block
на всех этих фьючерсах, но только один раз, на верхнем уровне и только на результатах всех вычислений, а не промежуточных.
К сожалению, эта Responder
вещь, возвращаемая compute()
, больше не имеет блокирующего apply()
метода, как Future
. Я не уверен, почему flatMapping Future
s генерирует общий Responder
вместо Future
; это похоже на ошибку API. Но, в любом случае, вы сможете сделать свое собственное:
def claim[A](r:Responder[A]):A = {
import java.util.concurrent.ArrayBlockingQueue
import scala.actors.Actor.actor
val q = new ArrayBlockingQueue[A](1)
// uses of 'respond' need to be wrapped in an actor or future block
actor { r.respond(a => q.put(a)) }
return q.take
}
Так что теперь вы можете создать блокирующий вызов для вычисления в вашем main
методе так:
val finalResult = claim(compute(input))