Я использую Spark ETL для передачи данных из базы данных Oracle в корзину S3 в формате паркета. Процесс очень хорошо работает с набором данных, который составляет <1B строк данных. Тем не менее, когда набор данных превышает 1B строк. Это начинает вызывать пропущенные значения. (Количество Афины / Улья отличается от количества Оракула.) </p>
Я хочу разбить набор данных по строкам, а не по столбцам. Поэтому я должен добавить столбец увеличения идентификаторов и модуль номера разделов. Я действительно не уверен, какой шаг вызвал пропущенные данные, потому что кажется, что он связан только с размером набора данных. Вот код:
val table = sparkSession.read
.format("jdbc")
.option("url", "jdbc:oracle:thin://XXXX")
.option("dbtable", "tcga.%s".format(tableName))
.option("user", "XXXX")
.option("password", "XXXX")
.option("driver", "oracle.jdbc.driver.OracleDriver")
.option("fetchsize", "500000")
.option("numPartitions", "1000")
.option("parquet.block.size", "128 * 1024 * 1024")
.option("dfs.blocksize", "128 * 1024 * 1024")
.load()
println("writing tablename: %s".format(tableName))
//partition the dataset by rows, not columns, hence the new column and the modulous
val finalDF = table.withColumn("rowid", monotonically_increasing_id())
.withColumn("rowPartition", col("rowid") % 100)
finalDF.write.partitionBy("rowPartition").format("parquet").mode("Overwrite").save(XXX)
val df = spark.read.parquet(XXX)
//sort the data, drop the column, re-partition the data df.orderBy("XXX").drop("rowpartition").drop("rowid").repartition(55)
.write.mode("overwrite").parquet(XXX)