Scala: Обработка фьючерсов партиями, отсортированными по (приблизительному) времени завершения - PullRequest
0 голосов
/ 06 мая 2020
// I have hundreds of tasks converting inputs into outputs, which should be persisted.

case class Op(i: Int)
case class Output(i: Int)
val inputs: Seq[Op] = ??? // Number of inputs is huge

def executeLongRunning(op: Op): Output = {
  Thread.sleep(Random.nextInt(1000) + 1000) // I cannot predict which tasks will finish first
  println("<==", op)
  Output(op.i)
}
def executeSingleThreadedSave(outputs: Seq[Output]): Unit = {
  synchronized { // Problem is, persisting output is itself a long-running process, 
                 // which cannot be parallelized (internally uses blocking queue).
    Thread.sleep(5000) // persist time is independent of outputs.size
    println("==>", outputs) // Order of persisted records does not matter
  }
}

// TODO: this needs to be implemented
def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = ??? 

val eventualOutputs: Seq[Future[Output]] = inputs.map((input: Op) => Future(executeLongRunning(input)))

magicSaver(eventualOutputs, executeSingleThreadedSave)

Я мог бы реализовать magicSaver как:

def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = {
  saver(Await.result(Future.sequence(eventualOutputs), Duration.Inf))
}

Но у этого есть серьезный недостаток, заключающийся в том, что мы ждем, пока все входные данные будут обработаны, прежде чем мы начнем сохранять выходные данные, что не идеален с точки зрения отказоустойчивости.

Другая реализация:

def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = {
  eventualOutputs.foreach(_.onSuccess { case output: Output => saver(Seq(output)) })
}

, но это увеличивает время выполнения до inputs.size * 5secs (из-за синхронизированного характера, который не является приемлемо.

Мне нужен способ объединить уже завершенные фьючерсы , когда количество таких фьючерсов достигнет некоторого компромиссного размера (например, 100), но я не уверен как это сделать чисто без явного кодирования журнала опроса c:

def magicSaver(eventualOutputs: Seq[Future[Output]], saver: Seq[Output] => Unit): Unit = {
  def waitFor100CompletedFutures(eventualOutputs: Seq[Future[Output]]): (Seq[Output], Seq[Future[Output]]) = {
    var completedCount: Int = 0
    do {
      completedCount = eventualOutputs.count(_.isCompleted)
      Thread.sleep(100)
    } while ((completedCount < 100) && (completedCount != eventualOutputs.size))
    val (completed: Seq[Future[Output]], remaining: Seq[Future[Output]]) = eventualOutputs.partition(_.isCompleted)
    (Await.result(Future.sequence(completed), Duration.Inf), remaining)
  }

  var completed: Seq[Output] = null
  var remaining: Seq[Future[Output]] = eventualOutputs
  do {
    (completed: Seq[Output], remaining: Seq[Future[Output]]) = waitFor100CompletedFutures(remaining)
    saver(completed)
  } while (remaining.nonEmpty)
}

Любое элегантное решение, которое мне здесь не хватает?

1 Ответ

0 голосов
/ 07 мая 2020

Я размещаю здесь свое решение для справки. Его преимущество состоит в том, что он полностью избегает пакетирования и вызывает processOutput, как только вывод становится доступным, что является лучшей ситуацией при описанных мною ограничениях.

def magicSaver[T, R](eventualOutputs: Seq[Future[T]], 
                     processOutput: Seq[T] => R)(implicit ec: ExecutionContext): Seq[R] = {
  logInfo(s"Size of outputs to save: ${eventualOutputs.size}")

  var remaining: Seq[Future[T]] = eventualOutputs
  val processorOutput: mutable.ListBuffer[R] = new mutable.ListBuffer[R]
  do {
    val (currentCompleted: Seq[Future[T]], currentRemaining: Seq[Future[T]]) = remaining.partition(_.isCompleted)
    if (remaining.size == currentRemaining.size) {
      Thread.sleep(100)
    } else {
      logInfo(s"Got ${currentCompleted.size} completed records, remaining ${currentRemaining.size}")
      val completed = currentCompleted.map(Await.result(_, Duration.Zero))
      processorOutput.append(processOutput(completed))
    }
    remaining = currentRemaining
  } while (remaining.nonEmpty)
  processorOutput
}
...