Когда вы делаете
val df = (1 to 8).toSeq.toDF.repartition(8)
Это не создаст 8 разделов с 1 записью каждый. Если вы проверите этот фрейм данных (см., Например, https://stackoverflow.com/a/46032600/1138523),, то получите:
+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
| 0| 0|
| 1| 0|
| 2| 0|
| 3| 0|
| 4| 0|
| 5| 0|
| 6| 4|
| 7| 4|
+----------------+-----------------+
Таким образом, у вас будет только 2 непустых раздела, поэтому у вас будет максимум 2-кратный параллелизм (я спрашивал об этом здесь: Как работает разбиение Round Robin в Spark? )
Для создания разделов одинакового размера лучше использовать
spark.sparkContext.parallelize((0 to 7), numSlices = 8)
вместо
(1 to 8).toSeq.toDF.repartition(8).rdd
Первая опция дает вам 1 запись на раздел, вторая не так, как она использует циклическое разбиение
В качестве примечания: если вы введете x.foreach
, то будет использовано x
(итераторы доступны только один раз), поэтому, если вы вернете x
, вы всегда получите пустой итератор.
Итак, ваш окончательный код может выглядеть так:
spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.mapPartitions(
x => {
val xL = x.toList // convert to List
assert(xL.size==1) // make sure partition has only 1 record
val conn = createConnection()
xL.foreach{
s => { // expect the below query be run concurently
execute(s"SELECT * FROM myTable WHERE col = ${s}")
}
}
conn.close()
xL.toIterator
})
.collect // trigger all queries
Вместо использования mapPartitions
(что лениво), вы также можете использовать foreachPartition
, что не лениво
Поскольку у вас есть только 1 запись на раздел, перебор разделов не очень полезен, вы также можете просто использовать простой foreach
:
spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.foreach( s=> {
val conn = createConnection()
execute(s"SELECT * FROM myTable WHERE col = ${s}")
conn.close()
})