Я пытался использовать Spark вместо Sq oop для переноса таблицы Oracle в HDFS. Размер выходной таблицы составляет около 122 (ГБ, snappy, паркет) с ~ 1 140 000 000 записей.
В одном кластере (5 узлов) Sq oop с 50 преобразователями занимает час для миграции (parquet / snappy), но Spark со следующей конфигурацией занимает 2,5 часа .
spark.sparkContext.hadoopConfiguration.set("parquet.block.size", (1024 * 1024 * 256).toString)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.task.cpus", "5")
spark.conf.set("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")
val dbtable = "(SELECT * FROM pd_prd) tmp"
val df = spark.read
.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("dbtable", dbtable)
.option("user", user)
.option("password", password)
.option("numPartitions", 50)
.option("partitionColumn", "prd_no")
.option("lowerBound", 0)
.option("upperBound", 2796542250L)
.option("fetchSize", 400000)
.load()
df.write.mode("overwrite").saveAsTable("tmp.test")
С 50 исполнителями, 6 ядрами на исполнителя, память драйвера 4G, память исполнителя 16G.
Это работа только для карт, тогда я думаю, что должно быть нет drasti c производительность разница между двумя. Но можете ли вы догадаться, почему искра так медленная здесь?
Какие-либо другие варианты оптимизации, которые я могу рассмотреть, чтобы минимизировать время выполнения Spark?
Здесь Выход Spark искажается в отличие от Sq oop. Как я вижу, Sq oop внутренне обрабатывает этот перекос путем подсчета общего и общего / общего числа карт для равномерного распределения. Как вы думаете, что я могу реализовать аналогичную функцию в Spark? (AFAIK, Spark просто группирует partitionColumn так, что 2796542250/50: = 55 000 000, и выдает, например, что-то вроде «SELECT * FROM pd_prd WHERE prd_no <55000000», что приводит к перекосу, поскольку старые продукты (более низкое prd_no) уже удалены из стол.) </p>