Набор данных Spark Partition по столбцу - PullRequest
1 голос
/ 02 мая 2019

(я новичок в Spark) Мне нужно хранить большое количество строк данных, а затем обрабатывать обновления этих данных.У нас есть уникальные идентификаторы (DB PK) для этих строк, и мы хотели бы отсеять набор данных с помощью uniqueID % numShards, чтобы сделать адресные разделы одинакового размера.Поскольку PK (уникальные идентификаторы) присутствуют как в данных, так и в файлах обновлений, будет легко определить, какой раздел будет обновлен.Мы намерены разделять данные и обновления по одним и тем же критериям и периодически переписывать «shard S + все накопленные обновления для shard S => new shard S».(Мы знаем, как объединить осколок S + обновления = новый осколок S.)

Если это наш дизайн, нам нужно (1) осколок DataFrame одним из его столбцов (скажем, столбец K)в |range(K)| разделах, где гарантируется, что все строки в разделе имеют одинаковое значение в столбце K и (2) смогут найти файл Parquet, который соответствует column_K = k, зная k = row.uniqueID % numShards.

Это хороший дизайн, или Spark предлагает что-то из коробки, что делает нашу задачу намного проще?

Какой класс / метод Spark мы должны использовать для разделения наших данных?Мы смотрим на RangePartitioner, но конструктор запрашивает количество разделов.Мы хотим указать «использовать column_K для разделения и создать один раздел для каждого отдельного значения k in range(K)», потому что мы уже создали column_K = uniqueID % numShards.Какой разделитель подходит для разбиения по значению одного столбца DataFrame?Нужно ли нам создавать пользовательские разделители, или использовать partitionBy, или repartitionByRange, или ...?

Это то, что мы имеем до сих пор:

import org.apache.spark.sql.functions._
val df = spark.read
.option("fetchsize", 1000)
.option("driver", "oracle.jdbc.driver.OracleDriver")
.jdbc(jdbc_url, "SCHEMA.TABLE_NAME", partitions, props)
.withColumn("SHARD_ID", col("TABLE_PK") % 1024)
.write
.parquet("parquet/table_name")

Теперь мынеобходимо указать, что этот DataFrame должен быть разбит на SHARD_ID, прежде чем он будет записан в виде файлов Parquet.

1 Ответ

1 голос
/ 02 мая 2019

Это работает:

val df = spark.read
.option("fetchsize", 1000)
.option("driver", "oracle.jdbc.driver.OracleDriver")
.jdbc(jdbc.getString("url"), "SCHEMA.TABLE_NAME", partitions, props)
.withColumn("SHARD_ID", col("TABLE_PK") % 1024)
.write
.partitionBy("SHARD_ID")
.parquet("parquet/table_name")
...