Тупики с java.util.concurrent._ в Scala в REPL - PullRequest
0 голосов
/ 27 января 2019

Я сталкивался со следующим сценарием при изучении книги Пола Кьюзано и Рунара Бьянарсона «Функциональное программирование в Scala» (гл. 7 - Чисто функциональный параллелизм).

    package fpinscala.parallelism

    import java.util.concurrent._
    import language.implicitConversions


    object Par {
      type Par[A] = ExecutorService => Future[A]

      def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)

      def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a) // `unit` is represented as a function that returns a `UnitFuture`, which is a simple implementation of `Future` that just wraps a constant value. It doesn't use the `ExecutorService` at all. It's always done and can't be cancelled. Its `get` method simply returns the value that we gave it.

      private case class UnitFuture[A](get: A) extends Future[A] {
        def isDone = true
        def get(timeout: Long, units: TimeUnit) = get
        def isCancelled = false
        def cancel(evenIfRunning: Boolean): Boolean = false
      }

      def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] = // `map2` doesn't evaluate the call to `f` in a separate logical thread, in accord with our design choice of having `fork` be the sole function in the API for controlling parallelism. We can always do `fork(map2(a,b)(f))` if we want the evaluation of `f` to occur in a separate thread.
        (es: ExecutorService) => {
          val af = a(es)
          val bf = b(es)
          UnitFuture(f(af.get, bf.get)) // This implementation of `map2` does _not_ respect timeouts. It simply passes the `ExecutorService` on to both `Par` values, waits for the results of the Futures `af` and `bf`, applies `f` to them, and wraps them in a `UnitFuture`. In order to respect timeouts, we'd need a new `Future` implementation that records the amount of time spent evaluating `af`, then subtracts that time from the available time allocated for evaluating `bf`.
        }

      def fork[A](a: => Par[A]): Par[A] = // This is the simplest and most natural implementation of `fork`, but there are some problems with it--for one, the outer `Callable` will block waiting for the "inner" task to complete. Since this blocking occupies a thread in our thread pool, or whatever resource backs the `ExecutorService`, this implies that we're losing out on some potential parallelism. Essentially, we're using two threads when one should suffice. This is a symptom of a more serious problem with the implementation, and we will discuss this later in the chapter.
        es => es.submit(new Callable[A] {
          def call = a(es).get
        })

      def lazyUnit[A](a: => A): Par[A] = fork(unit(a))

 def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean =
    p(e).get == p2(e).get

}

Вы можете найти оригинальный кодна Github здесь .См. здесь для документации java.util.concurrent.

Я обеспокоен реализацией fork.В частности, якобы fork может привести к тупикам, когда ThreadPool слишком мал.

Я рассматриваю следующий пример:

val a = Par.lazyUnit(42 + 1)
val es: ExecutorService = Executors.newFixedThreadPool(2)
println(Par.fork(a)(es).get)  

Я не ожидал бы, что этот пример закончится втупик как есть две темы.Тем не менее, это происходит на моем компьютере, когда я запускаю его в Scala REPL.Почему это так?

Вывод при инициализации ExecutorService: es: java.util.concurrent.ExecutorService =

java.util.concurrent.ThreadPoolE
xecutor@73a86d72[Running, pool size = 0, active threads = 0, queued tasks =
 0, completed tasks = 0]

Правильно ли здесь pool size = 0?Другими словами, это проблема непонимания java.util.concurrent._ или проблема непонимания части Scala?

1 Ответ

0 голосов
/ 01 февраля 2019

ОК, после долгого расследования я думаю, что у меня есть ответ.Полная история длинна, но я постараюсь сократить ее, упрощая и избегая многих деталей.

Примечание : Потенциально Scala может быть скомпилирована для различных целевых платформ, но эта конкретная проблема возникла наJava / JVM как цель, так что это то, о чем этот ответ.

Взаимная блокировка, которую вы видите, не имеет никакого отношения к размеру пула потоков.На самом деле это внешний fork звонок, который зависает.Это связано с сочетанием деталей реализации REPL и многопоточности, но чтобы понять, как это происходит, нужно изучить несколько частей:

  • как работает Scala REPL
  • как компилируется Scala object с Java / JVM
  • как Scala эмулирует параметры по имени в Java / JVM
  • как Java / JVM запускает статические инициализаторы классов

Краткая (er) версия (см. Также Summary в конце) состоит в том, что этот код зависает под REPL, потому что когда он выполняется REPL, он логически похож на следующий код:

object DeadLock {

  import scala.concurrent._
  import scala.concurrent.duration.Duration
  import scala.concurrent.ExecutionContext.Implicits.global

  val foo: Int = Await.result(Future(calc()), Duration.Inf)

  def printFoo(): Unit = {
    println(s"Foo = $foo")
  }

  private def calc(): Int = {
    println("Before calc")
    42
  }
}


def test(): Unit = {
  println("Before printFoo")
  DeadLock.printFoo()
  println("After printFoo")
} 

или очень похожий в мире Java:

class Deadlock {
    static CompletableFuture<Integer> cf;
    static int foo;

    public static void printFoo() {
        System.out.println("Print foo " + foo);
    }

    static {
        cf = new CompletableFuture<Integer>();
        new Thread(new Runnable() {
            @Override
            public void run() {
                calcF();
            }
        }).start();
        try {
            foo = cf.get();
            System.out.println("Future result = " + cf.get());
        } catch (InterruptedException e) {
            e.printStackTrace();f
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }


    private static void calcF() {
        cf.complete(42);
    }
}

public static void main(String[] args) {
    System.out.println("Before foo");
    Deadlock.printFoo();
    System.out.println("After foo");
}

Если вам понятно, почему этот код заходит в тупик, вы уже знаете большую часть истории и, вероятно, сможете сами вывести остальную часть.Вы можете просто взглянуть на раздел Summary в конце.

Как статический инициализатор Java может зайти в тупик?

Давайте начнем с конца этогоистория: почему код Java висит?Это происходит из-за двух гарантий Java / JVM для статического инициализатора (подробнее см. Раздел 12.4.2. Подробная процедура инициализации JLS):

  • статический инициализатор будет запущен перед любым другим «внешним» использованием класса

  • статический инициализатор будет запущен ровно один раз, и это будет выполнено с помощью глобальной блокировки

Блокировка, используемая для статического инициализатора, является неявной и управляется JVM, но она есть.Это означает, что код логически похож на что-то вроде этого:

class Deadlock {

    static boolean staticInitFinished = false;
    // unique value for each thread!
    static ThreadLocal<Boolean> currentThreadRunsStaticInit = ThreadLocal.withInitial(() -> Boolean.FALSE);


    static CompletableFuture<Integer> cf;
    static int foo;

    static void enforceStaticInit() {
        synchronized (Deadlock.class) {
            // is init finished?
            if (staticInitFinished)
                return;
            // are we the thread already running the init?
            if(currentThreadRunsStaticInit.get())
                return;
            currentThreadRunsStaticInit.set(true);

            cf = new CompletableFuture<Integer>();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    calcF();
                }
            }).start();
            try {
                foo = cf.get();
                System.out.println("Future result = " + cf.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            currentThreadRunsStaticInit.set(false);
            staticInitFinished = true;
        }
    }

    private static void calcF() {
        enforceStaticInit();
        cf.complete(42);
    }

    public static void printFoo() {
        enforceStaticInit();
        System.out.println("Print foo " + foo);
    }
}

Теперь совершенно ясно, почему этот код блокируется: наш статический инициализатор запускает новый поток и блокирует ожидание результата из него.Но этот новый поток пытается получить доступ к тому же классу (метод calcF) и, будучи другим потоком, должен ждать завершения уже запущенного статического инициализатора.Обратите внимание, что если бы метод calcF находился в другом классе, все работало бы отлично.

Как работает Scala REPL

Теперь давайте вернемся к началурассказ о том, как работает Scala REPL.Этот ответ является большим упрощением реальной сделки, но он отражает важные для этой ситуации детали.К счастью для разработчиков REPL, компилятор Scala написан на Scala.Это означает, что REPL не должен каким-то образом интерпретировать код, он может запустить его через стандартный компилятор и затем запустить скомпилированный код через Java Reflection API.Это все еще требует некоторой декорации кода, чтобы сделать компилятор счастливым и вернуть результаты.

Немного (или ну, очень) упрощая его, когда вы набираете что-то вроде

val a = Par.lazyUnit(42 + 1)

в REPL код анализируется и преобразуется во что-то вроде этого:

package line3

object read {
    val a = Par.lazyUnit(42 + 1)
    val res3 = a
}

object eval {
    def print() = {
        println("a: Par.Par[Int] = " + read.res3)
    }
}

, а затем line3.eval.print() вызывается через отражение.

Подобная история случается для:

val es: ExecutorService = Executors.newFixedThreadPool(2)

и, наконец, когда вы делаете

Par.fork(a)(es).get

все становится немного интереснее, потому что вы зависите от предыдущегостроки, которые реализованы с использованием import s:

package line5

object read {
    import line2.read.Par
    import line3.read.a
    import line4.read.es

    val res5 = Par.fork(a)(es).get
}

object eval {
    def print() = {
        println("res5: Int = " + read.res5)
    }
}

Важная вещь здесь заключается в том, что все, что вы пишете в REPL, обернуто в совершенно новый object, а затем скомпилирован и запущен как обычный код.

Как Scala эмулирует параметры по имени в Java / JVM

В определении метода fork используется параметр по имени :

def fork[A](a: => Par[A]): Par[A] =

Здесь он используется для ленивой оценки a, что имеет решающее значение для всей логики fork.Java / JVM не имеет стандартной поддержки для отложенной оценки, но ее можно эмулировать, и именно это делает компилятор Scala.Внутренне подпись изменяется на использование Function0:

def fork[A](aWrapper: () => Par[A]): Par[A] = 

, и каждый доступ к a заменяется вызовом aWrapper.apply().Другая часть магии происходит на стороне вызывающего метода с параметром по имени: там параметр также должен быть заключен в Function0, чтобы код стал чем-то вроде

object read {
    import line2.read.Par
    import line3.read.a
    import line4.read.es

    val res5 = Par.fork(() => a)(es).get
}

Но на самом деле этонемного по-другому.Наивно это потребовало бы другого класса только для этой маленькой функции, и это кажется расточительным для такой простой логики.На практике в Scala 2.12 используется магия Java 8 LambdaMetafactory , поэтому код действительно становится чем-то вроде

object read {
    import line2.read.Par
    import line3.read.a
    import line4.read.es

    def aWrapper():Int = a

    val res5 = Par.fork(aWrapper _)(es).get
}

, где aWrapper _ означает преобразование метода в Funciton0это делается с помощью LambdaMetafactory.Как вы можете догадаться из главы о взаимоблокировке статического инициализатора Java, введение def aWrapper является принципиальным отличием .Вы уже можете видеть, что этот код очень похож на первый фрагмент кода Scala в ответе, который зависает.

Как Scala компилирует object на Java / JVM

Последний фрагмент головоломки - это то, как Scala object компилируется в Java / JVM.Ну, на самом деле он скомпилирован во что-то похожее на «статический класс», но поскольку вы можете использовать object в качестве параметра объекта, это должно быть немного сложнее.В действительности вся логика инициализации перемещается в конструктор класса object, и существует простой статический инициализатор, который вызывает его.Таким образом, наш последний read объект в Java (без учета import s) будет выглядеть так:

class read$ {
    static read$ MODULE$

    static {
        new read$()
    }

    private Par[Int] res5;

    private read$() {
        MODULE$ = this;
        res5 = Par.fork(read$::aWrapper)(es).get
    }

    private static int aWrapper(){
        return line3.read$.MODULE$.a;
    }
}

и здесь read$::aWrapper означает создание Function0 формы aWrapper метода с использованиемLambdaMetafactory.Другими словами, инициализация Scala object преобразуется в код, который выполняется как часть статического инициализатора Java .

Сводка

Подводя итог, как все облажается:

  • REPL превращает ваш код в новый object для каждой строки и ее компиляции

  • object логика инициализации преобразуется в статическую логику инициализации Java

  • вызов метода сПараметр by-name в простых случаях переводится в метод, заключающий в себе логику «вернуть значение», и этот метод добавляется к тому же class или object

  • Par.fork будучи выполненным как часть инициализации object (то есть частью статического инициализатора Java) пытается оценить параметр по имени (то есть вызывает метод в том же классе) в другом потоке и блокирует ожидание результатаэтого потока

  • Статическим инициализатором Java является логикавыполняется только под глобальной блокировкой, поэтому блокирует другой поток, вызывающий метод.Но он сам блокируется в ожидании завершения вызова этого метода.

...