Я хочу запустить несколько параллельных SQL в одном кластере, чтобы я мог использовать весь ресурсный кластер в ширину кластера. Я использую sqlContext.sql (запрос).
Я видел пример кода здесь , как следует,
val parallelism = 10
val executor = Executors.newFixedThreadPool(parallelism)
val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val tasks: Seq[String] = ???
val results: Seq[Future[Int]] = tasks.map(query => {
Future{
//spark stuff here
0
}(ec)
})
val allDone: Future[Seq[Int]] = Future.sequence(results)
//wait for results
Await.result(allDone, scala.concurrent.duration.Duration.Inf)
executor.shutdown //otherwise jvm will probably not exit
Как я понял, ExecutionContext вычисляет доступные ядра в машине (используя ForkJoinPool) и соответственно выполняет параллелизм. Но что произойдет, если мы рассмотрим искровой кластер, отличный от отдельной машины, и как он может гарантировать полное использование ресурсов кластера .?
Например: если у меня есть кластер из 10 узлов с каждыми 4 ядрами, то как вышеприведенный код гарантирует, что 40 ядер будут использованы.
редактирует: -
Допустим, нужно выполнить 2 sql, у нас есть 2 способа сделать это
отправлять запросы последовательно, так что второй запрос будет выполнен только после выполнения первого. (потому что sqlContext.sql (запрос) является синхронным вызовом)
Отправьте оба запроса параллельно, используя Futures, чтобы оба запроса выполнялись независимо и параллельно в кластере
при условии, что ресурсов достаточно (в обоих случаях).
Я думаю, что второй лучше, потому что он использует максимальные ресурсы, доступные в кластере, и если первый запрос полностью использовал ресурсы, планировщик будет ожидать завершения задания (в зависимости от политики), что справедливо в этом случай.
Но, как отметил пользователь9613318, «увеличение размера пула приведет к насыщению драйвера»
Тогда как я могу эффективно контролировать потоки для лучшего использования ресурсов.