// I have hundreds of tasks converting inputs into outputs, which should be persisted.
case class Op(i: Int)
case class Output(i: Int)
val inputs: Seq[Op] = ??? // Number of inputs is huge
def executeLongRunning(op: Op): Output = {
Thread.sleep(Random.nextInt(1000) + 1000) // I cannot predict which tasks will finish first
println("<==", op)
Output(op.i)
}
def executeSingleThreadedSave(outputs: Seq[Output]): Unit = {
synchronized { // Problem is, persisting output is itself a long-running process,
// which cannot be parallelized (internally uses blocking queue).
Thread.sleep(5000) // persist time is independent of outputs.size
println("==>", outputs) // Order of persisted records does not matter
}
}
// TODO: this needs to be implemented
def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = ???
val eventualOutputs: Seq[Future[Output]] = inputs.map((input: Op) => Future(executeLongRunning(input)))
magicSaver(eventualOutputs, executeSingleThreadedSave)
Я мог бы реализовать magicSaver
как:
def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = {
saver(Await.result(Future.sequence(eventualOutputs), Duration.Inf))
}
Но у этого есть серьезный недостаток, заключающийся в том, что мы ждем, пока все входные данные будут обработаны, прежде чем мы начнем сохранять выходные данные, что не идеален с точки зрения отказоустойчивости.
Другая реализация:
def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = {
eventualOutputs.foreach(_.onSuccess { case output: Output => saver(Seq(output)) })
}
, но это увеличивает время выполнения до inputs.size * 5secs (из-за синхронизированного характера, который не является приемлемо.
Мне нужен способ объединить уже завершенные фьючерсы , когда количество таких фьючерсов достигнет некоторого компромиссного размера (например, 100), но я не уверен как это сделать чисто без явного кодирования журнала опроса c:
def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = {
def waitFor100CompletedFutures(eventualOutputs: Seq[Future[Output]]): (Seq[Output], Seq[Future[Output]]) = {
var completedCount: Int = 0
do {
completedCount = eventualOutputs.count(_.isCompleted)
Thread.sleep(100)
} while ((completedCount < 100) && (completedCount != eventualOutputs.size))
val (completed: Seq[Future[Output]], remaining: Seq[Future[Output]]) = eventualOutputs.partition(_.isCompleted)
(Await.result(Future.sequence(completed), Duration.Inf), remaining)
}
var completed: Seq[Output] = null
var remaining: Seq[Future[Output]] = eventualOutputs
do {
(completed: Seq[Output], remaining: Seq[Future[Output]]) = waitFor100CompletedFutures(remaining)
saver(completed)
} while (remaining.nonEmpty)
}
Любое элегантное решение, которое мне здесь не хватает?