Почему я получаю состояние гонки в многопоточном скале? - PullRequest
0 голосов
/ 09 июня 2018

Я пытаюсь распараллелить вычисление p-нормы по массиву.

Чтобы добиться этого, я попробую следующее, я понимаю, что могу решить это по-другому, но мне интересно понять, где происходит состояние гонки,

val toSum = Array(0,1,2,3,4,5,6)

// Calculate the sum over a segment of an array
def sumSegment(a: Array[Int], p:Double, s: Int, t: Int): Int = {
  val res = {for (i <- s until t) yield scala.math.pow(a(i), p)}.reduceLeft(_ + _)
  res.toInt
}

// Calculate the p-norm over an Array a
def parallelpNorm(a: Array[Int], p: Double): Double = {
  var acc = 0L

  // The worker who should calculate the sum over a slice of an array
  class sumSegmenter(s: Int, t: Int) extends Thread {
    override def run() {
      // Calculate the sum over the slice
      val subsum = sumSegment(a, p, s, t)
      // Add the sum of the slice to the accumulator in a synchronized fashion
      val x = new AnyRef{}
      x.synchronized {
        acc = acc + subsum
      }
    }
  }

  val split = a.size  / 2
  val seg_one = new sumSegmenter(0, split)
  val seg_two = new sumSegmenter(split, a.size)
  seg_one.start
  seg_two.start
  seg_one.join
  seg_two.join
  scala.math.pow(acc, 1.0 / p)
}
println(parallelpNorm(toSum, 2))

Ожидаемый результат равен 9.5393920142, но вместо этого некоторыепробеги дают мне 9.273618495495704 или даже 2.23606797749979.

Какие-нибудь рекомендации, где могло бы произойти состояние гонки?

Ответы [ 2 ]

0 голосов
/ 10 июня 2018

Проблема была объяснена в предыдущем ответе, но лучший способ избежать этого состояния гонки и повысить производительность - это использовать AtomicInteger

// Calculate the p-norm over an Array a
def parallelpNorm(a: Array[Int], p: Double): Double = {
  val acc = new AtomicInteger(0)

  // The worker who should calculate the sum over a slice of an array
  class sumSegmenter(s: Int, t: Int) extends Thread {
    override def run() {
      // Calculate the sum over the slice
      val subsum = sumSegment(a, p, s, t)
      // Add the sum of the slice to the accumulator in a synchronized fashion
      acc.getAndAdd(subsum)
    }
  }

  val split = a.length / 2
  val seg_one = new sumSegmenter(0, split)
  val seg_two = new sumSegmenter(split, a.length)
  seg_one.start()
  seg_two.start()
  seg_one.join()
  seg_two.join()
  scala.math.pow(acc.get, 1.0 / p)
}

Современные процессоры могут выполнять атомарные операции без блокировки, котораяможет быть намного быстрее, чем явная синхронизация.В моих тестах это работает в два раза быстрее исходного кода (при правильном размещении x)

0 голосов
/ 09 июня 2018

Переместить val x = new AnyRef{} за пределы sumSegmenter (то есть в parallelpNorm) - проблема в том, что каждый поток использует свой собственный мьютекс, а не разделяет его.

...