Невозможно выполнить параллельные вычисления с функциональным API - PullRequest
1 голос
/ 28 апреля 2019

Я пытаюсь реализовать функцию countWords из красной книги в главе о параллелизме. Когда я передаю пул потоков функции и изменяю функцию для печати потока, считающего слова, я вижу только напечатанный основной поток. Это указывает на то, что я не могу заставить эту функцию выполняться параллельно.

Что у меня сейчас есть:

type Par[A] = ExecutorService => Future[A]

def asyncF[A, B](f: A => B): A => Par[B] = a => lazyUnit(f(a))

def lazyUnit[A](a: => A): Par[A] = fork(unit(a))

def unit[A](a: A): Par[A] = (_: ExecutorService) => UnitFuture(a)

def fork[A](a: => Par[A]): Par[A] = 
es => es.submit(new Callable[A] {
  def call = a(es).get
})

def countWords(l: List[String]): Par[Int] = map(sequence(l.map(asyncF {
println(Thread.currentThread())
s => s.split(" ").length
})))(_.sum)

Когда я бегу:

val listPar = List("ab cd", "hg ks", "lh ks", "lh hs")

val es = Executors.newFixedThreadPool(4)

val counts = countWords(listPar)(es)

println(counts.get(100, SECONDS))

Я получаю:

Thread[main,5,main]
8

Я ожидаю увидеть поток, напечатанный для каждого элемента списка (так как имеется четыре элемента и пул потоков размером 4), однако я могу видеть только напечатанный основной поток.

Есть предложения? Спасибо

1 Ответ

1 голос
/ 29 апреля 2019

Я хочу начать с одного совета при задании вопросов - вы всегда должны предоставить MCVE . Ваш код не компилируется; например, я понятия не имею, откуда взялся UnitFuture, понятия не имею, что такое реализация sequence, которую вы используете и т. д.

Вот фрагмент кода, который работает со стандартным Scala. Сначала объяснение:

Метод countWords принимает список строк для подсчета, а также две службы - одну для обработки фьючерсов Java в разных потоках и одну для обработки фьючерсов Scala в разных потоках. Scala one является производной от Java one методом ExecutionContext.fromExecutor.

Почему и Java, и Scala? Ну, я хотел сохранить Java, потому что именно так вы изначально писали свой код, но я не знаю, как sequence Java Future. Итак, что я сделал:

  • для каждой подстроки:
    • форк задача Java Future
    • превратить его в будущее Scala
  • последовательность полученного списка Scala Futures

Если вы не знакомы с последствиями, вы это сделаете (если вы собираетесь работать со Scala). Здесь я использовал контекст выполнения неявно, потому что он удаляет много стандартного шаблона - таким образом, мне не нужно явно передавать его при преобразовании в будущее Scala, при отображении / секвенировании и т. Д.

А теперь сам код:

import java.util.concurrent.{Callable, ExecutorService, Executors}
import java.util.concurrent.{Future => JFuture}

import scala.concurrent.{ExecutionContext, Future}

def scalaFromJavaFuture[A](
  javaFuture: JFuture[A]
)(implicit ec: ExecutionContext): Future[A] =
  Future { javaFuture.get }(ec)

def fork(s: String)(es: ExecutorService): java.util.concurrent.Future[Int] =
  es.submit(new Callable[Int] {
    def call = {
      println(s"Thread: ${Thread.currentThread()}, processing string: $s")
      s.split(" ").size
    }
  })

def countWords(l: List[String])(es: ExecutorService)(implicit ec: ExecutionContext): Future[Int] = {
  val listOfFutures = l.map(elem => scalaFromJavaFuture(fork(elem)(es)))
  Future.sequence(listOfFutures).map(_.sum)
}

val listPar = List("ab cd", "hg ks", "lh ks", "lh hs")

val es = Executors.newFixedThreadPool(4)
implicit val ec = ExecutionContext.fromExecutor(es)

val counts = countWords(listPar)(es)

counts.onComplete(println)

Пример вывода:

Тема: Thread [pool-1-thread-1,5, main], строка обработки: ab cd
Тема: Thread [pool-1-thread-3,5, main], строка обработки: hg ks
Тема: Thread [pool-1-thread-2,5, main], строка обработки: lh ks
Тема: Thread [pool-1-thread-4,5, main], строка обработки: lh hs
Успех (8)

Обратите внимание: определение контекста зависит от контекста выполнения. Запустите его пару раз, и вы убедитесь в этом сами. используются только две темы:

Тема: Thread [pool-1-thread-1,5, main], строка обработки: ab cd
Тема: Thread [pool-1-thread-3,5, main], строка обработки: hg ks
Thread: Thread [pool-1-thread-1,5, main], строка обработки: lh ks
Тема: Thread [pool-1-thread-1,5, main], строка обработки: lh hs
Успех (8)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...