Spark распределяет задания по нескольким исполнителям - PullRequest
0 голосов
/ 08 января 2019

Я бы хотел выполнять SQL-запрос параллельно и иметь возможность контролировать уровень параллелизма для 8 запросов. Прямо сейчас я делаю этот кусок кода. Идея состоит в том, чтобы создать 8 разделов и позволить исполнителям запускать их параллельно.

  (1 to 8).toSeq.toDF.repartition(8) // 8 partitions
  .rdd.mapPartitions(
  x => {
  val conn = createConnection()
    x.foreach{
      s => { // expect the below query be run concurently
      execute(s"SELECT * FROM myTable WHERE col = ${s.get(0)}")
      }
    }
  conn.close()
  x
  }).take(1)

Проблема в том, что 8 запросов выполняются один за другим.

Как мне выполнить запросы 8 на 8?

1 Ответ

0 голосов
/ 09 января 2019

Когда вы делаете

val df = (1 to 8).toSeq.toDF.repartition(8)

Это не создаст 8 разделов с 1 записью каждый. Если вы проверите этот фрейм данных (см., Например, https://stackoverflow.com/a/46032600/1138523),, то получите:

+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
|               0|                0|
|               1|                0|
|               2|                0|
|               3|                0|
|               4|                0|
|               5|                0|
|               6|                4|
|               7|                4|
+----------------+-----------------+

Таким образом, у вас будет только 2 непустых раздела, поэтому у вас будет максимум 2-кратный параллелизм (я спрашивал об этом здесь: Как работает разбиение Round Robin в Spark? )

Для создания разделов одинакового размера лучше использовать

spark.sparkContext.parallelize((0 to 7), numSlices = 8)

вместо

(1 to 8).toSeq.toDF.repartition(8).rdd

Первая опция дает вам 1 запись на раздел, вторая не так, как она использует циклическое разбиение

В качестве примечания: если вы введете x.foreach, то будет использовано x (итераторы доступны только один раз), поэтому, если вы вернете x, вы всегда получите пустой итератор.

Итак, ваш окончательный код может выглядеть так:

 spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.mapPartitions(
  x => {
  val xL = x.toList  // convert to List
  assert(xL.size==1) // make sure partition has only 1 record

  val conn = createConnection()
    xL.foreach{
      s => { // expect the below query be run concurently
      execute(s"SELECT * FROM myTable WHERE col = ${s}")
      }
    }
  conn.close()
  xL.toIterator
  })
 .collect // trigger all queries

Вместо использования mapPartitions (что лениво), вы также можете использовать foreachPartition, что не лениво

Поскольку у вас есть только 1 запись на раздел, перебор разделов не очень полезен, вы также можете просто использовать простой foreach:

 spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.foreach( s=> {
  val conn = createConnection()
  execute(s"SELECT * FROM myTable WHERE col = ${s}")   
  conn.close()
})
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...