Вы можете заключить блокирующие операции ввода-вывода в блок blocking
:
(0 to 1000).par.map{ i =>
blocking {
Thread.sleep(100)
Thread.activeCount()
}
}.max // yield 67 on my pc, while without blocking it's 10
Но вам следует задать себе вопрос, следует ли использовать параллельные наборы для операций ввода-вывода.Их вариант использования - выполнение сложной задачи процессора.
Я бы предложил вам рассмотреть возможность использования фьючерсов для вызовов ввода-вывода.
Вам также следует рассмотреть возможность использования пользовательского контекста выполнения для этой задачи, поскольку глобальный контекст выполнения является общедоступным одноэлементным, и вы не можете контролировать, какой код его использует и для какой цели.Вы могли бы легко голодать параллельные вычисления, созданные внешними библиотеками, если бы использовали все потоки из этого.
// or just use scala.concurrent.ExecutionContext.Implicits.global if you don't care
implicit val blockingIoEc: ExecutionContextExecutor = ExecutionContext.fromExecutor(
Executors.newCachedThreadPool()
)
def fetchData(index: Int): Future[Int] = Future {
//if you use global ec, then it's required to mark computation as blocking to increase threads,
//if you use custom cached thread pool it should increase thread number even without it
blocking {
Thread.sleep(100)
Thread.activeCount()
}
}
val futures = (0 to 1000).map(fetchData)
Future.sequence(futures).onComplete {
case Success(data) => println(data.max) //prints about 1000 on my pc
}
Thread.sleep(1000)
EDIT
Существует также возможность использовать пользовательские ForkJoinPool с использованием ForkJoinTaskSupport :
import java.util.concurrent.ForkJoinPool //scala.concurrent.forkjoin.ForkJoinPool is deprecated
import scala.util.Random
import scala.collection.parallel
val fjpool = new ForkJoinPool(2)
val customTaskSupport = new parallel.ForkJoinTaskSupport(fjpool)
val numbers = List(1,2,3,4,5).par
numbers.tasksupport = customTaskSupport //assign customTaskSupport