Неявный класс, содержащий изменяемые переменные в многопоточной среде - PullRequest
0 голосов
/ 25 января 2019

Мне нужно реализовать метод parallel, который принимает два вычислительных блока a и b и запускает каждый из них в новом потоке. Метод должен возвращать кортеж с результирующими значениями обоих вычислений. Он должен иметь следующую подпись:

def parallel[A, B](a: => A, b: => B): (A, B)

Мне удалось выполнить упражнение, используя прямой Java-подобный подход. Тогда я решил составить решение с неявным классом. Вот оно:

object ParallelApp extends App {

  implicit class ParallelOps[A](a: => A) {
    var result: A = _

    def spawn(): Unit = {

      val thread = new Thread {
        override def run(): Unit = {
          result = a
        }
      }
      thread.start()
      thread.join()
    }
  }

  def parallel[A, B](a: => A, b: => B): (A, B) = {
    a.spawn()
    b.spawn()
    (a.result, b.result)

  }

  println(parallel(1 + 2, "a" + "b"))

}

По неизвестной причине я получаю вывод (null,null). Не могли бы вы указать мне, где проблема?

Ответы [ 2 ]

0 голосов
/ 25 января 2019

Оповещение о спойлере: Это не сложно. Это забавно, как волшебная уловка (если вы считаете чтение документации о Java Memory Model «забавным»). Если вы еще не поняли, я настоятельно рекомендую попытаться выяснить это, иначе это не будет смешно. Кто-то должен сделать из нее "деление на ноль, доказывающее, что 2 = 4" .


Рассмотрим следующий более короткий пример:

implicit class Foo[A](a: A) {
  var result: String = "not initialized"
  def computeResult(): Unit = result = "Yay, result!"
}

val a = "a string"
a.computeResult()

println(a.result)

При запуске печатается

not initialized

несмотря на то, что мы вызвали computeResult() и установили result на "Yay, result!". Проблема в том, что два вызова a.computeResult() и a.result принадлежат двум совершенно независимым экземплярам Foo. Неявное преобразование выполняется дважды, и второй неявно созданный объект ничего не знает об изменениях в первом неявно созданном объекте. Это никак не связано с потоками или JMM.

Кстати: ваш код не параллелен. Вызов join сразу после вызова start ничего не принесет, ваш основной поток просто простаивает и будет ждать завершения другого потока. Ни в коем случае не будет двух потоков, которые выполняют какую-либо полезную работу одновременно.

0 голосов
/ 25 января 2019

РЕДАКТИРОВАТЬ: Исправлена ​​ошибка, указанная Андреем Тюкиным

Одним из способов решения вашей проблемы является использование Scala Futures

Документация . Учебник . Полезный блог Кланга .

Обычно вам потребуется комбинация этих библиотек:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.concurrent.duration._

асинхронный пример:

def parallelAsync[A,B](a: => A, b: => B): Future[(A,B)] = {
  // as per Andrey Tyukin's comments, this line runs
  // the two futures sequentially and we do not get
  // any benefit from it.  I will leave this line here
  // so others will not fall in my trap
  //for {i <- Future(a); j <- Future(b) } yield (i,j)
  Future(a) zip Future(b)
}

parallelAsync(1 + 2, "a" + "b").onComplete {
  case Success(x) => println(x)
  case Failure(e) => e.printStackTrace()
}

Если вам нужно заблокировать, пока оба не будут завершены, вы можете использовать это:

def parallelSync[A,B](a: => A, b: => B): (A,B) = {
  // see comment above
  //val f = for { i <- Future(a); j <- Future(b) } yield (i,j)
  val tuple = Future(a) zip Future(b)
  Await.result(tuple, 5 second)
}

println(parallelSync(3 + 4, "c" + "d"))

При запуске этих маленьких примеров не забудьте немного поспать в конце, чтобы программа не закончилась, пока результаты не вернутся

Thread.sleep(3000)
...