JDBC для Spark Dataframe - Как обеспечить равномерное разбиение? - PullRequest
2 голосов
/ 11 июня 2019

Я новичок в Spark и работаю над созданием DataFrame из таблицы базы данных Postgres через JDBC, используя spark.read.jdbc.

Меня немного смущают параметры разделения, в частности partitionColumn , lowerBound , upperBound и numPartitions .


  • Документация указывает на то, что эти поля являются необязательными. Что произойдет, если я не предоставлю их?
  • Как Spark знает, как разделить запросы? Насколько это будет эффективно?
  • Если я определю эти параметры, как я могу гарантировать, что размеры разделов примерно равны, даже если partitionColumn распределен неравномерно?

Допустим, у меня будет 20 исполнителей, поэтому я установил для numPartitions значение 20.
Мой partitionColumn представляет собой поле идентификатора с автоинкрементом, и, скажем, значения находятся в диапазоне от 1 до 2 000 000
. Однако, поскольку пользователь выбирает для обработки некоторые действительно старые данные, а также некоторые действительно новые данные, причем в середине ничего нет, большинство данных имеют значения идентификаторов либо менее 100 000, либо более 1 900 000.

  • Получат ли мои 1-й и 20-й исполнители большую часть работы, в то время как остальные 18 исполнителей в основном сидят без дела?

  • Если так, есть ли способ предотвратить это?

Ответы [ 2 ]

1 голос
/ 13 июня 2019

Я нашел способ вручную указать границы раздела, используя конструктор jdbc с параметром предикатов .

Это позволяет вам явно указывать отдельные условия для вставки в "условие «where» для каждого раздела, которое позволяет вам точно указать, какой диапазон строк будет получать каждый раздел.Поэтому, если у вас нет равномерно распределенного столбца для автоматического разделения, вы можете настроить собственную стратегию разделения.

Пример ее использования можно найти в принятом ответе на этот вопрос .

0 голосов
/ 11 июня 2019

Что означают все эти параметры: spark.read.jdbc относится к чтению таблицы из РСУБД.

параллелизм - это сила искры, для достижения которой вам необходимо упомянуть все эти параметры.

Вопрос [с]: -)

1) Документация указывает на то, что эти поля являются необязательными.Что произойдет, если я их не предоставлю?

Ответ: Параллелизм по умолчанию или плохой параллелизм

В зависимости от сценария разработчик должен позаботиться о стратегии настройки производительности.и для обеспечения разделения данных через границы (или разделы), которые, в свою очередь, будут параллельными задачами.Видя этот путь.

2) Как Spark знает, как разделить запросы?Насколько это будет эффективно?

jdbc-reads - ссылка на документы базы данных

Вы можете задать границы разделения на основе значений столбцов набора данных.

  • Эти параметры определяют параллелизм при чтении.
  • Все эти параметры необходимо указывать, если указан какой-либо из них.

Примечание

Эти параметры задают параллельность чтения таблицы .lowerBound и upperBound определяют шаг разбиения, но не фильтруют строки в таблице.Поэтому Spark разбивает и возвращает все строки в таблице.

Пример 1:
Вы можете разделить чтение таблицы по исполнителям в столбце emp_no, используя partitionColumn, lowerBound, upperBound и numPartitions параметры.

val df = spark.read.jdbc(url=jdbcUrl,
    table="employees",
    columnName="emp_no",
    lowerBound=1L,
    upperBound=100000L,
    numPartitions=100,
    connectionProperties=connectionProperties)

также numPartitions означает количество параллельных соединений, которые вы просите СУБД прочитать данные.если вы предоставляете numPartitions, то вы ограничиваете количество соединений ... не исчерпывая соединения на стороне СУБД ..

Источник примера 2: представление данных для загрузки данных оракула в cassandra :

val basePartitionedOracleData = sqlContext
.read
.format("jdbc")
.options(
Map[String, String](
"url" -> "jdbc:oracle:thin:username/password@//hostname:port/oracle_svc",
"dbtable" -> "ExampleTable",
"lowerBound" -> "1",
"upperBound" -> "10000",
"numPartitions" -> "10",
"partitionColumn" -> “KeyColumn"
)
)
.load()

Последние четыре аргумента в этой карте приведены с целью получения многораздельного набора данных.Если вы передаете какой-либо из них, вы должны передать все из них.

Когда вы передаете эти дополнительные аргументы, вот что это делает:

Этосоздает шаблон инструкций SQL в формате

SELECT * FROM {tableName} WHERE {partitionColumn} >= ? AND
{partitionColumn} < ?

Он отправляет {numPartitions} операторов в механизм БД.Если вы добавили следующие значения: {dbTable = ExampleTable, lowerBound = 1, upperBound = 10 000, numPartitions = 10, partitionColumn = KeyColumn}, будут созданы следующие десять операторов:

SELECT * FROM ExampleTable WHERE KeyColumn >= 1 AND KeyColumn < 1001
SELECT * FROM ExampleTable WHERE KeyColumn >= 1001 AND KeyColumn < 2000
SELECT * FROM ExampleTable WHERE KeyColumn >= 2001 AND KeyColumn < 3000
SELECT * FROM ExampleTable WHERE KeyColumn >= 3001 AND KeyColumn < 4000
SELECT * FROM ExampleTable WHERE KeyColumn >= 4001 AND KeyColumn < 5000
SELECT * FROM ExampleTable WHERE KeyColumn >= 5001 AND KeyColumn < 6000
SELECT * FROM ExampleTable WHERE KeyColumn >= 6001 AND KeyColumn < 7000
SELECT * FROM ExampleTable WHERE KeyColumn >= 7001 AND KeyColumn < 8000
SELECT * FROM ExampleTable WHERE KeyColumn >= 8001 AND KeyColumn < 9000
SELECT * FROM ExampleTable WHERE KeyColumn >= 9001 AND KeyColumn < 10000
And then it would put the results of each of those queries in its own partition in Spark.

Вопрос [с]: -)

Если я определю эти параметры, как я могу гарантировать, что размеры разделов примерно равны, даже если для partitionColumn нетравномерно распределены?

Получат ли мои 1-й и 20-й исполнители большую часть работы, в то время как остальные 18 исполнителей будут сидеть в основном без дела?

Если да, есть ли способ предотвратить это?


На все вопросы есть один ответ

Ниже описан способ ... 1) Вам необходимо понять, сколько записей / строк на раздел .... на основе этого вы можете repartition или coalesce

Фрагмент 1: Spark 1.6>
spark 2.x позволяет узнать, сколько записей в разделе.

spark_partition_id() существует в org.apache.spark.sql.functions

import org.apache.spark.sql.functions._ 
val df = "<your dataframe read through rdbms.... using spark.read.jdbc>"
df.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count.show

Фрагмент 2: для всех версий Spark

df
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_number","NumberOfRecordsPerPartition")
  .show

и затем вам нужно снова включить свою стратегию, запросить настройку между диапазонами или перераспределением и т. Д. ...., вы можете использовать mappartitions или foreachpartitions

Вывод: Я предпочитаю использовать данныеопции, которые работают с числовыми столбцами, так как я видел, что это делит данные на единообразные по границам / разделам.

Некоторое время может быть невозможно использовать эту опцию, тогда потребуется ручная настройка разделов / парлеллизма..


Обновление:

С помощью приведенного ниже мы можем добиться равномерного распределения ...

  1. Получить первичный ключ таблицы.
  2. Найдите ключевые минимальные и максимальные значения.
  3. Выполните Spark с этими значениями.

def main(args: Array[String]){
// parsing input parameters ...
val primaryKey = executeQuery(url, user, password, s"SHOW KEYS FROM ${config("schema")}.${config("table")} WHERE Key_name = 'PRIMARY'").getString(5)
val result = executeQuery(url, user, password, s"select min(${primaryKey}), max(${primaryKey}) from ${config("schema")}.${config("table")}")
    val min = result.getString(1).toInt
    val max = result.getString(2).toInt
    val numPartitions = (max - min) / 5000 + 1
val spark = SparkSession.builder().appName("Spark reading jdbc").getOrCreate()
var df = spark.read.format("jdbc").
option("url", s"${url}${config("schema")}").
option("driver", "com.mysql.jdbc.Driver").
option("lowerBound", min).
option("upperBound", max).
option("numPartitions", numPartitions).
option("partitionColumn", primaryKey).
option("dbtable", config("table")).
option("user", user).
option("password", password).load()
// some data manipulations here ...
df.repartition(10).write.mode(SaveMode.Overwrite).parquet(outputPath)      
}
...