Я хочу начать с одного совета при задании вопросов - вы всегда должны предоставить MCVE . Ваш код не компилируется; например, я понятия не имею, откуда взялся UnitFuture
, понятия не имею, что такое реализация sequence
, которую вы используете и т. д.
Вот фрагмент кода, который работает со стандартным Scala. Сначала объяснение:
Метод countWords
принимает список строк для подсчета, а также две службы - одну для обработки фьючерсов Java в разных потоках и одну для обработки фьючерсов Scala в разных потоках. Scala one является производной от Java one методом ExecutionContext.fromExecutor
.
Почему и Java, и Scala? Ну, я хотел сохранить Java, потому что именно так вы изначально писали свой код, но я не знаю, как sequence
Java Future. Итак, что я сделал:
- для каждой подстроки:
- форк задача Java Future
- превратить его в будущее Scala
- последовательность полученного списка Scala Futures
Если вы не знакомы с последствиями, вы это сделаете (если вы собираетесь работать со Scala). Здесь я использовал контекст выполнения неявно, потому что он удаляет много стандартного шаблона - таким образом, мне не нужно явно передавать его при преобразовании в будущее Scala, при отображении / секвенировании и т. Д.
А теперь сам код:
import java.util.concurrent.{Callable, ExecutorService, Executors}
import java.util.concurrent.{Future => JFuture}
import scala.concurrent.{ExecutionContext, Future}
def scalaFromJavaFuture[A](
javaFuture: JFuture[A]
)(implicit ec: ExecutionContext): Future[A] =
Future { javaFuture.get }(ec)
def fork(s: String)(es: ExecutorService): java.util.concurrent.Future[Int] =
es.submit(new Callable[Int] {
def call = {
println(s"Thread: ${Thread.currentThread()}, processing string: $s")
s.split(" ").size
}
})
def countWords(l: List[String])(es: ExecutorService)(implicit ec: ExecutionContext): Future[Int] = {
val listOfFutures = l.map(elem => scalaFromJavaFuture(fork(elem)(es)))
Future.sequence(listOfFutures).map(_.sum)
}
val listPar = List("ab cd", "hg ks", "lh ks", "lh hs")
val es = Executors.newFixedThreadPool(4)
implicit val ec = ExecutionContext.fromExecutor(es)
val counts = countWords(listPar)(es)
counts.onComplete(println)
Пример вывода:
Тема: Thread [pool-1-thread-1,5, main], строка обработки: ab cd
Тема: Thread [pool-1-thread-3,5, main], строка обработки: hg ks
Тема: Thread [pool-1-thread-2,5, main], строка обработки: lh ks
Тема: Thread [pool-1-thread-4,5, main], строка обработки: lh hs
Успех (8)
Обратите внимание: определение контекста зависит от контекста выполнения. Запустите его пару раз, и вы убедитесь в этом сами. используются только две темы:
Тема: Thread [pool-1-thread-1,5, main], строка обработки: ab cd
Тема: Thread [pool-1-thread-3,5, main], строка обработки: hg ks
Thread: Thread [pool-1-thread-1,5, main], строка обработки: lh ks
Тема: Thread [pool-1-thread-1,5, main], строка обработки: lh hs
Успех (8)