Параллельная функция для выполнения поэлементных операций над коллекцией в Scala - PullRequest
0 голосов
/ 10 января 2020

Используя методы более высокого порядка в Scala, я могу выполнять поэлементную операцию над данной коллекцией, как показано ниже

def fun1(l1 :List[Double], l2 :List[Double]) :List[Double] = (l1,l2).zipped.map((x,y) => x + y)

, и, используя императивный способ, я могу выполнять ту же операцию намного быстрее, чем fun1

def fun2(a1: Array[Double], a2: Array[Double]): Array[Double] = {
  val res = new Array[Double](a1.length)
  var i = 0
  while (i < a1.length) {
    res(i) = a1(i) + a2(i)
    i += 1
  }
  res
}

Я хочу написать параллельную функцию, которая выполняет ту же операцию. Используя Scala, могу ли я парализовать любую из вышеперечисленных функций? Если нет, то как мне написать параллельную функцию, которая обеспечивает чистый функциональный параллелизм для поэлементных операций над коллекциями?

Ответы [ 2 ]

1 голос
/ 11 января 2020

Чтобы цена параллелизма окупилась за рабочую нагрузку на элемент, скорее всего, было бы достаточно тяжеловес . Ниже приведены тесты нескольких альтернатив, основанных на последовательных коллекциях, параллельных коллекциях и Future s. Мы сравниваем как легкую рабочую нагрузку (добавление двух чисел), так и симуляцию тяжелой операции с Thread.sleep(1):

  • sequentialArray: последовательно работаем с Array (на основе Travis )
  • futureArray: логически разбить Array на куски и иметь отдельную Future работу для каждого куска
  • parallelArray: использовать параллельные коллекции работать на Array (на основе Luis , axel22 )
  • parallelListZip: использовать параллельные коллекции для zip и работать на List ( основано на axel22 )

Реализация:

@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
class So59685582 {
  val simulateHeavyWorkload = ???
  val length = ???
  val as = Array.fill(length)(math.random)
  val bs = Array.fill(length)(math.random)

  def sequentialArray(as: Array[Double], bs: Array[Double]): Array[Double] = {
    val length = as.length
    val out = new Array[Double](length)
    var i = 0
    while (i < length) {
      if (simulateHeavyWorkload) Thread.sleep(1)
      out(i) = as(i) * bs(i)
      i += 1
    }
    out
  }

  def futureArray(as: Array[Double], bs: Array[Double], numThreads: Int): Array[Double] = {
    val length = as.length
    val out = new Array[Double](length)
    val chunkSize = length / numThreads
    val fs =
      (0 until numThreads).map { t =>
        var i = t * chunkSize
        val to = ((t + 1) * chunkSize)
        Future {
          while (i <= to) {
            if (simulateHeavyWorkload) Thread.sleep(1)
            out(i) = as(i) * bs(i)
            i += 1
          }
        }
      }
    Await.ready(Future.sequence(fs), Duration.Inf)
    out
  }

  def parallelArray(as: Array[Double], bs: Array[Double]): Array[Double] = {
    val length = as.length
    val out = new Array[Double](length)
    (0 until length).par.foreach { i =>
      if (simulateHeavyWorkload) Thread.sleep(1)
      out(i) = as(i) + bs(i)
    }
    out
  }

  def parallelListZip(as: List[Double], bs: List[Double]): List[Double] = {
    as.par.zip(bs.par).map { case (a, b) =>
      if (simulateHeavyWorkload) Thread.sleep(1)
      a + b
    }.to(List)
  }

  @Benchmark def _sequentialArray: Array[Double] = sequentialArray(as, bs)
  @Benchmark def _futureArray: Array[Double] = futureArray(as, bs, numThreads = 12)
  @Benchmark def _parallelArray: Array[Double] = parallelArray(as, bs)
  @Benchmark def _parallelListZip: List[Double] = parallelListZip(as.toList, bs.toList)
}

Результаты sbt "jmh:run -i 10 -wi 10 -f 2 -t 1 bench.So59685582":

Тест 1

val simulateHeavyWorkload = true
val length = 1000

[info] Benchmark                     Mode  Cnt  Score   Error  Units
[info] So59685582._futureArray      thrpt   20  9.251 ± 0.034  ops/s
[info] So59685582._parallelArray    thrpt   20  6.493 ± 0.175  ops/s
[info] So59685582._parallelListZip  thrpt   20  6.379 ± 0.117  ops/s
[info] So59685582._sequentialArray  thrpt   20  0.790 ± 0.007  ops/s

Тест 2

val simulateHeavyWorkload = false
val length = 1000

[info] So59685582._futureArray      thrpt   20    27097.347 ±   369.995  ops/s
[info] So59685582._parallelArray    thrpt   20    17864.004 ±   163.846  ops/s
[info] So59685582._parallelListZip  thrpt   20     2942.416 ±   108.180  ops/s
[info] So59685582._sequentialArray  thrpt   20  1773303.066 ± 55856.225  ops/s

Тест 3

val simulateHeavyWorkload = false
val length = 10000000

[info] Benchmark                     Mode  Cnt   Score   Error  Units
[info] So59685582._futureArray      thrpt   20  50.271 ± 1.444  ops/s
[info] So59685582._parallelArray    thrpt   20  53.998 ± 1.397  ops/s
[info] So59685582._parallelListZip  thrpt   20   0.167 ± 0.040  ops/s
[info] So59685582._sequentialArray  thrpt   20  55.183 ± 1.025  ops/s

Результаты

  • Когда операция была облегченной, последовательная обработка выполнялась лучше или приблизительно равной параллельной обработке, даже при большом размере 10000000 элементов.
  • Когда размер был небольшим (1000), параллельная обработка на легком весе была на несколько порядков медленнее t операция
  • Когда операция выполнялась в тяжелом весе, параллельная обработка выполнялась лучше, чем последовательная, в пределах порядка
  • futureArray выполнялась лучше всего, когда число потоков составляло 12, что является числом ядер на моем компьютере согласно availableProcessors. Более того, это привело к снижению производительности.
  • parallelListZip, использующий List, zip и map, имел производительность, аналогичную parallelArray, которая использует изменчивость и l oop при работе был тяжелым, и размер не был слишком большим (1000).
1 голос
/ 11 января 2020
sergey$ SBT_OPTS="-Xmx2G -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=2G -Xss2M  -Duser.timezone=GMT" sbt console
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=2G; support was removed in 8.0
[info] Loading global plugins from /Users/rsergey/.sbt/1.0/plugins
[info] Loading project definition from /Users/rsergey/project
[info] Set current project to rsergey (in build file:/Users/rsergey/)
[info] Starting scala interpreter...
Welcome to Scala 2.12.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231).
Type in expressions for evaluation. Or try :help.

scala> def fun1(l1 :List[Double], l2 :List[Double]) :List[Double] = (l1,l2).zipped.map((x,y) => x + y)
fun1: (l1: List[Double], l2: List[Double])List[Double]

scala> import scala.collection.parallel.immutable.ParSeq
import scala.collection.parallel.immutable.ParSeq

scala> def parFun1(l1: ParSeq[Double], l2: ParSeq[Double]) = l1.zip(l2).map{case (x,y)=>(x+y)}
parFun1: (l1: scala.collection.parallel.immutable.ParSeq[Double], l2: scala.collection.parallel.immutable.ParSeq[Double])scala.collection.parallel.immutable.ParSeq[Double]


scala> val l1 = Range(0,5000000).map(_.toDouble).toList
l1: List[Double] = List(0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, 40.0, 41.0, 42.0, 43.0, 44.0, 45.0, 46.0, 47.0, 48.0, 49.0, 50.0, 51.0, 52.0, 53.0, 54.0, 55.0, 56.0, 57.0, 58.0, 59.0, 60.0, 61.0, 62.0, 63.0, 64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0, 71.0, 72.0, 73.0, 74.0, 75.0, 76.0, 77.0, 78.0, 79.0, 80.0, 81.0, 82.0, 83.0, 84.0, 85.0, 86.0, 87.0, 88.0, 89.0, 90.0, 91.0, 92.0, 93.0, 94.0, 95.0, 96.0, 97.0, 98.0, 99.0, 100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0, 110.0, 111.0, 112.0, 113.0, 114.0, 115.0, 116.0, 117.0, 118.0, 119.0, 120.0, 121.0, 122...

scala> val l2 = Range(-5000000, 0).map(_.toDouble).toList
l2: List[Double] = List(-5000000.0, -4999999.0, -4999998.0, -4999997.0, -4999996.0, -4999995.0, -4999994.0, -4999993.0, -4999992.0, -4999991.0, -4999990.0, -4999989.0, -4999988.0, -4999987.0, -4999986.0, -4999985.0, -4999984.0, -4999983.0, -4999982.0, -4999981.0, -4999980.0, -4999979.0, -4999978.0, -4999977.0, -4999976.0, -4999975.0, -4999974.0, -4999973.0, -4999972.0, -4999971.0, -4999970.0, -4999969.0, -4999968.0, -4999967.0, -4999966.0, -4999965.0, -4999964.0, -4999963.0, -4999962.0, -4999961.0, -4999960.0, -4999959.0, -4999958.0, -4999957.0, -4999956.0, -4999955.0, -4999954.0, -4999953.0, -4999952.0, -4999951.0, -4999950.0, -4999949.0, -4999948.0, -4999947.0, -4999946.0, -4999945.0, -4999944.0, -4999943.0, -4999942.0, -4999941.0, -4999940.0, -4999939.0, -49...

scala> val l1par = l1.par
l1par: scala.collection.parallel.immutable.ParSeq[Double] = ParVector(0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, 40.0, 41.0, 42.0, 43.0, 44.0, 45.0, 46.0, 47.0, 48.0, 49.0, 50.0, 51.0, 52.0, 53.0, 54.0, 55.0, 56.0, 57.0, 58.0, 59.0, 60.0, 61.0, 62.0, 63.0, 64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0, 71.0, 72.0, 73.0, 74.0, 75.0, 76.0, 77.0, 78.0, 79.0, 80.0, 81.0, 82.0, 83.0, 84.0, 85.0, 86.0, 87.0, 88.0, 89.0, 90.0, 91.0, 92.0, 93.0, 94.0, 95.0, 96.0, 97.0, 98.0, 99.0, 100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0, 110.0, 111.0, 112.0, 113.0, 114.0, 115.0,...

scala> val l2par = l2.par
l2par: scala.collection.parallel.immutable.ParSeq[Double] = ParVector(-5000000.0, -4999999.0, -4999998.0, -4999997.0, -4999996.0, -4999995.0, -4999994.0, -4999993.0, -4999992.0, -4999991.0, -4999990.0, -4999989.0, -4999988.0, -4999987.0, -4999986.0, -4999985.0, -4999984.0, -4999983.0, -4999982.0, -4999981.0, -4999980.0, -4999979.0, -4999978.0, -4999977.0, -4999976.0, -4999975.0, -4999974.0, -4999973.0, -4999972.0, -4999971.0, -4999970.0, -4999969.0, -4999968.0, -4999967.0, -4999966.0, -4999965.0, -4999964.0, -4999963.0, -4999962.0, -4999961.0, -4999960.0, -4999959.0, -4999958.0, -4999957.0, -4999956.0, -4999955.0, -4999954.0, -4999953.0, -4999952.0, -4999951.0, -4999950.0, -4999949.0, -4999948.0, -4999947.0, -4999946.0, -4999945.0, -4999944.0, -4999943.0, -4999...

scala> def time[R](block: => R): R = {val t0 = System.nanoTime(); val result = block; val t1 = System.nanoTime(); println("Elapsed time: " + (t1 - t0) + "ns"); result }
time: [R](block: => R)R

scala> time { fun1(l1, l2) }
Elapsed time: 3928108671ns
res2: List[Double] = List(-5000000.0, -4999998.0, -4999996.0, -4999994.0, -4999992.0, -4999990.0, -4999988.0, -4999986.0, -4999984.0, -4999982.0, -4999980.0, -4999978.0, -4999976.0, -4999974.0, -4999972.0, -4999970.0, -4999968.0, -4999966.0, -4999964.0, -4999962.0, -4999960.0, -4999958.0, -4999956.0, -4999954.0, -4999952.0, -4999950.0, -4999948.0, -4999946.0, -4999944.0, -4999942.0, -4999940.0, -4999938.0, -4999936.0, -4999934.0, -4999932.0, -4999930.0, -4999928.0, -4999926.0, -4999924.0, -4999922.0, -4999920.0, -4999918.0, -4999916.0, -4999914.0, -4999912.0, -4999910.0, -4999908.0, -4999906.0, -4999904.0, -4999902.0, -4999900.0, -4999898.0, -4999896.0, -4999894.0, -4999892.0, -4999890.0, -4999888.0, -4999886.0, -4999884.0, -4999882.0, -4999880.0, -4999878.0, -...


scala> time { parFun1(l1par, l2par) }
Elapsed time: 292256058ns
res5: scala.collection.parallel.immutable.ParSeq[Double] = ParVector(-5000000.0, -4999998.0, -4999996.0, -4999994.0, -4999992.0, -4999990.0, -4999988.0, -4999986.0, -4999984.0, -4999982.0, -4999980.0, -4999978.0, -4999976.0, -4999974.0, -4999972.0, -4999970.0, -4999968.0, -4999966.0, -4999964.0, -4999962.0, -4999960.0, -4999958.0, -4999956.0, -4999954.0, -4999952.0, -4999950.0, -4999948.0, -4999946.0, -4999944.0, -4999942.0, -4999940.0, -4999938.0, -4999936.0, -4999934.0, -4999932.0, -4999930.0, -4999928.0, -4999926.0, -4999924.0, -4999922.0, -4999920.0, -4999918.0, -4999916.0, -4999914.0, -4999912.0, -4999910.0, -4999908.0, -4999906.0, -4999904.0, -4999902.0, -4999900.0, -4999898.0, -4999896.0, -4999894.0, -4999892.0, -4999890.0, -4999888.0, -4999886.0, -49998...
...