Как эффективно сочетать будущие результаты как будущее - PullRequest
2 голосов
/ 05 мая 2019

У меня много расчетов, способствующих окончательному результату, без ограничений по порядку взносов.Похоже, что фьючерсы должны быть в состоянии ускорить это, и они делают, но не так, как я думал, что они будут.Вот код, сравнивающий производительность очень глупого способа деления целых чисел:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

object scale_me_up {
  def main(args: Array[String]) {
    val M = 500 * 1000
    val N = 5
    Thread.sleep(3210) // let launcher settle down
    for (it <- 0 until 15) {
      val method = it % 3
      val start = System.currentTimeMillis()
      val result = divide(M, N, method)
      val elapsed = System.currentTimeMillis() - start
      assert(result == M / N)
      if (it >= 6) {
        val methods = Array("ordinary", "fast parallel", "nice parallel")
        val name = methods(method)
        println(f"$name%15s: $elapsed ms")
      }
    }
  }

  def is_multiple_of(m: Int, n: Int): Boolean = {
    val result = !(1 until n).map(_ + (m / n) * n).toSet.contains(m)
    assert(result == (m % n == 0)) // yes, a less crazy implementation exists
    result
  }

  def divide(m: Int, n: Int, method: Int): Int = {
    method match {
      case 0 =>
        (1 to m).count(is_multiple_of(_, n))
      case 1 =>
        (1 to m)
          .map { x =>
            Future { is_multiple_of(x, n) }
          }
          .count(Await.result(_, Duration.Inf))
      case 2 =>
        Await.result(divide_futuristically(m, n), Duration.Inf)
    }
  }

  def divide_futuristically(m: Int, n: Int): Future[Int] = {
    val futures = (1 to m).map { x =>
      Future { is_multiple_of(x, n) }
    }
    Future.foldLeft(futures)(0) { (count, flag) =>
      { if (flag) { count + 1 } else { count } }
    }
    /* much worse performing alternative:
    Future.sequence(futures).map(_.count(identity))
    */
  }
}

Когда я запускаю это, параллельная case 1 несколько быстрее, чем обычный код case 0 (ура), но case 2 занимает в два раза больше времениКонечно, это зависит от системы и от того, нужно ли в будущем делать достаточно работы (которая здесь растет вместе со знаменателем N).[PS] Как и ожидалось, уменьшение N дает case 0 опережение, а достаточное увеличение N делает и case 1, и case 2 примерно в два раза быстрее, чем case 0 на моем двухъядерном процессоре.

Япривести к убеждению, что divide_futuristically - лучший способ выразить этот вид вычисления: возвращение будущего с объединенным результатом.Блокировка - это то, что нам нужно для измерения производительности.Но на самом деле, чем больше блокировок, тем быстрее все заканчивают.Что я делаю неправильно?Несколько вариантов суммирования фьючерсов (например, sequence ) имеют одинаковое наказание.

1 Ответ

2 голосов
/ 05 мая 2019

Кажется, ты все сделал правильно. Я сам пробовал разные подходы, даже .par, но получил тот же или худший результат.

Я погрузился в Future.foldLeft и попытался проанализировать причину задержки:

  /** A non-blocking, asynchronous left fold over the specified futures,
   *  with the start value of the given zero.
   *  The fold is performed asynchronously in left-to-right order as the futures become completed.
   *  The result will be the first failure of any of the futures, or any failure in the actual fold,
   *  or the result of the fold.
   *
   *  Example:
   *  {{{
   *    val futureSum = Future.foldLeft(futures)(0)(_ + _)
   *  }}}
   *
   * @tparam T       the type of the value of the input Futures
   * @tparam R       the type of the value of the returned `Future`
   * @param futures  the `scala.collection.immutable.Iterable` of Futures to be folded
   * @param zero     the start value of the fold
   * @param op       the fold operation to be applied to the zero and futures
   * @return         the `Future` holding the result of the fold
   */
  def foldLeft[T, R](futures: scala.collection.immutable.Iterable[Future[T]])(zero: R)(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] =
    foldNext(futures.iterator, zero, op)

  private[this] def foldNext[T, R](i: Iterator[Future[T]], prevValue: R, op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] =
    if (!i.hasNext) successful(prevValue)
    else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) }

и эта часть:

else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) }

.flatMap создает новое Будущее, которое отправляется на executor. Другими словами, каждый

    { (count, flag) =>
      { if (flag) { count + 1 } else { count } }
    }

исполняется как новое будущее.

Полагаю, эта часть вызывает экспериментально подтвержденную задержку.

...