Что означают все эти параметры: spark.read.jdbc
относится к чтению таблицы из РСУБД.
параллелизм - это сила искры, для достижения которой вам необходимо упомянуть все эти параметры.
Вопрос [с]: -)
1) Документация указывает на то, что эти поля являются необязательными.Что произойдет, если я их не предоставлю?
Ответ: Параллелизм по умолчанию или плохой параллелизм
В зависимости от сценария разработчик должен позаботиться о стратегии настройки производительности.и для обеспечения разделения данных через границы (или разделы), которые, в свою очередь, будут параллельными задачами.Видя этот путь.
2) Как Spark знает, как разделить запросы?Насколько это будет эффективно?
jdbc-reads - ссылка на документы базы данных
Вы можете задать границы разделения на основе значений столбцов набора данных.
- Эти параметры определяют параллелизм при чтении.
- Все эти параметры необходимо указывать, если указан какой-либо из них.
Примечание
Эти параметры задают параллельность чтения таблицы .lowerBound
и upperBound
определяют шаг разбиения, но не фильтруют строки в таблице.Поэтому Spark разбивает и возвращает все строки в таблице.
Пример 1:
Вы можете разделить чтение таблицы по исполнителям в столбце emp_no
, используя partitionColumn
, lowerBound
, upperBound
и numPartitions
параметры.
val df = spark.read.jdbc(url=jdbcUrl,
table="employees",
columnName="emp_no",
lowerBound=1L,
upperBound=100000L,
numPartitions=100,
connectionProperties=connectionProperties)
также numPartitions
означает количество параллельных соединений, которые вы просите СУБД прочитать данные.если вы предоставляете numPartitions, то вы ограничиваете количество соединений ... не исчерпывая соединения на стороне СУБД ..
Источник примера 2: представление данных для загрузки данных оракула в cassandra :
val basePartitionedOracleData = sqlContext
.read
.format("jdbc")
.options(
Map[String, String](
"url" -> "jdbc:oracle:thin:username/password@//hostname:port/oracle_svc",
"dbtable" -> "ExampleTable",
"lowerBound" -> "1",
"upperBound" -> "10000",
"numPartitions" -> "10",
"partitionColumn" -> “KeyColumn"
)
)
.load()
Последние четыре аргумента в этой карте приведены с целью получения многораздельного набора данных.Если вы передаете какой-либо из них, вы должны передать все из них.
Когда вы передаете эти дополнительные аргументы, вот что это делает:
Этосоздает шаблон инструкций SQL в формате
SELECT * FROM {tableName} WHERE {partitionColumn} >= ? AND
{partitionColumn} < ?
Он отправляет {numPartitions
} операторов в механизм БД.Если вы добавили следующие значения: {dbTable = ExampleTable, lowerBound
= 1, upperBound
= 10 000, numPartitions
= 10, partitionColumn
= KeyColumn}, будут созданы следующие десять операторов:
SELECT * FROM ExampleTable WHERE KeyColumn >= 1 AND KeyColumn < 1001
SELECT * FROM ExampleTable WHERE KeyColumn >= 1001 AND KeyColumn < 2000
SELECT * FROM ExampleTable WHERE KeyColumn >= 2001 AND KeyColumn < 3000
SELECT * FROM ExampleTable WHERE KeyColumn >= 3001 AND KeyColumn < 4000
SELECT * FROM ExampleTable WHERE KeyColumn >= 4001 AND KeyColumn < 5000
SELECT * FROM ExampleTable WHERE KeyColumn >= 5001 AND KeyColumn < 6000
SELECT * FROM ExampleTable WHERE KeyColumn >= 6001 AND KeyColumn < 7000
SELECT * FROM ExampleTable WHERE KeyColumn >= 7001 AND KeyColumn < 8000
SELECT * FROM ExampleTable WHERE KeyColumn >= 8001 AND KeyColumn < 9000
SELECT * FROM ExampleTable WHERE KeyColumn >= 9001 AND KeyColumn < 10000
And then it would put the results of each of those queries in its own partition in Spark.
Вопрос [с]: -)
Если я определю эти параметры, как я могу гарантировать, что размеры разделов примерно равны, даже если для partitionColumn нетравномерно распределены?
Получат ли мои 1-й и 20-й исполнители большую часть работы, в то время как остальные 18 исполнителей будут сидеть в основном без дела?
Если да, есть ли способ предотвратить это?
На все вопросы есть один ответ
Ниже описан способ ... 1) Вам необходимо понять, сколько записей / строк на раздел .... на основе этого вы можете repartition
или coalesce
Фрагмент 1: Spark 1.6>
spark 2.x позволяет узнать, сколько записей в разделе.
spark_partition_id()
существует в org.apache.spark.sql.functions
import org.apache.spark.sql.functions._
val df = "<your dataframe read through rdbms.... using spark.read.jdbc>"
df.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count.show
Фрагмент 2: для всех версий Spark
df
.rdd
.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
.toDF("partition_number","NumberOfRecordsPerPartition")
.show
и затем вам нужно снова включить свою стратегию, запросить настройку между диапазонами или перераспределением и т. Д. ...., вы можете использовать mappartitions или foreachpartitions
Вывод: Я предпочитаю использовать данныеопции, которые работают с числовыми столбцами, так как я видел, что это делит данные на единообразные по границам / разделам.
Некоторое время может быть невозможно использовать эту опцию, тогда потребуется ручная настройка разделов / парлеллизма..
Обновление:
С помощью приведенного ниже мы можем добиться равномерного распределения ...
- Получить первичный ключ таблицы.
- Найдите ключевые минимальные и максимальные значения.
- Выполните Spark с этими значениями.
def main(args: Array[String]){
// parsing input parameters ...
val primaryKey = executeQuery(url, user, password, s"SHOW KEYS FROM ${config("schema")}.${config("table")} WHERE Key_name = 'PRIMARY'").getString(5)
val result = executeQuery(url, user, password, s"select min(${primaryKey}), max(${primaryKey}) from ${config("schema")}.${config("table")}")
val min = result.getString(1).toInt
val max = result.getString(2).toInt
val numPartitions = (max - min) / 5000 + 1
val spark = SparkSession.builder().appName("Spark reading jdbc").getOrCreate()
var df = spark.read.format("jdbc").
option("url", s"${url}${config("schema")}").
option("driver", "com.mysql.jdbc.Driver").
option("lowerBound", min).
option("upperBound", max).
option("numPartitions", numPartitions).
option("partitionColumn", primaryKey).
option("dbtable", config("table")).
option("user", user).
option("password", password).load()
// some data manipulations here ...
df.repartition(10).write.mode(SaveMode.Overwrite).parquet(outputPath)
}