Повышение уровня параллелизма операций scala .par - PullRequest
2 голосов
/ 11 июня 2019

Когда я вызываю par для коллекций, создается впечатление, что создается около 5-10 потоков, что хорошо для задач, связанных с процессором.

Но иногда у меня есть задачи, связанные с IO, и в этом случае я бы хотел, чтобы 500-1000 потоков извлекали из IO одновременно - выполнение 10-15 потоков очень медленное, и я вижу, что мои процессоры в основном бездействуют.

Как мне этого добиться?

1 Ответ

3 голосов
/ 11 июня 2019

Вы можете заключить блокирующие операции ввода-вывода в блок blocking:

(0 to 1000).par.map{ i =>
    blocking {
      Thread.sleep(100)
      Thread.activeCount()
    }
}.max // yield 67 on my pc, while without blocking it's 10

Но вам следует задать себе вопрос, следует ли использовать параллельные наборы для операций ввода-вывода.Их вариант использования - выполнение сложной задачи процессора.

Я бы предложил вам рассмотреть возможность использования фьючерсов для вызовов ввода-вывода.

Вам также следует рассмотреть возможность использования пользовательского контекста выполнения для этой задачи, поскольку глобальный контекст выполнения является общедоступным одноэлементным, и вы не можете контролировать, какой код его использует и для какой цели.Вы могли бы легко голодать параллельные вычисления, созданные внешними библиотеками, если бы использовали все потоки из этого.

// or just use scala.concurrent.ExecutionContext.Implicits.global if you don't care
implicit val blockingIoEc: ExecutionContextExecutor = ExecutionContext.fromExecutor(
    Executors.newCachedThreadPool()
) 

def fetchData(index: Int): Future[Int] =  Future {
   //if you use global ec, then it's required to mark computation as blocking to increase threads,
   //if you use custom cached thread pool it should increase thread number even without it
    blocking { 
      Thread.sleep(100)
      Thread.activeCount()
    }
}

val futures = (0 to 1000).map(fetchData)

Future.sequence(futures).onComplete {
    case Success(data) => println(data.max) //prints about 1000 on my pc
}

Thread.sleep(1000)

EDIT

Существует также возможность использовать пользовательские ForkJoinPool с использованием ForkJoinTaskSupport :

import java.util.concurrent.ForkJoinPool //scala.concurrent.forkjoin.ForkJoinPool is deprecated
import scala.util.Random
import scala.collection.parallel

val fjpool = new ForkJoinPool(2) 
val customTaskSupport = new parallel.ForkJoinTaskSupport(fjpool) 

val numbers = List(1,2,3,4,5).par 

numbers.tasksupport = customTaskSupport //assign customTaskSupport
...