AFAIK, нет способа разделить один RDD на несколько RDD как таковых.Именно так работает DAG Spark: только дочерние RDD, извлекающие данные из родительских RDD.
Однако мы можем иметь несколько дочерних RDD, читающих из одного и того же родительского RDD.Чтобы избежать повторного вычисления родительского RDD, нет другого способа, кроме как кэшировать его. Я предполагаю, что вы хотите избежать кэширования, потому что боитесь нехватки памяти. Мы можем избежать проблемы нехватки памяти (OOM), сохранив СДР на MEMORY_AND_DISK
, так что большой СДД попадет на диск, еслии при необходимости.
Давайте начнем с ваших исходных данных:
val allDataRDD = sc.parallelize(Seq(Row(1,1,1),Row(2,2,2),Row(3,3,3)))
Сначала мы можем сохранить это в памяти, но в случае ее нехватки выпустить на диск:
allDataRDD.persist(StorageLevel.MEMORY_AND_DISK)
Затем мы создаем 3 выхода RDD:
filtered_data_1 = allDataRDD.filter(_.get(1)==1) // //
filtered_data_2 = allDataRDD.filter(_.get(2)==1) // use your own filter funcs here
filtered_data_3 = allDataRDD.filter(_.get(3)==1) // //
Затем запишем выходные данные:
var resultDF_1 = sqlContext.createDataFrame(filtered_data_1, schema_1)
resultDF_1.write.parquet(output_path_1)
var resultDF_2 = sqlContext.createDataFrame(filtered_data_2, schema_2)
resultDF_2.write.parquet(output_path_2)
var resultDF_3 = sqlContext.createDataFrame(filtered_data_3, schema_3)
resultDF_3.write.parquet(output_path_3)
Если вы действительно действительно хотите избежать несколькихпроходит, есть обходной путь, используя пользовательский разделитель.Вы можете перераспределить ваши данные на 3 раздела, и у каждого раздела будет своя собственная задача и, следовательно, собственный выходной файл / часть.Предостережение заключается в том, что параллелизм будет сильно сокращен до 3 потоков / задач, а также существует риск> 2 ГБ данных, хранящихся в одном разделе (Spark имеет ограничение 2 ГБ на раздел).Я не предоставляю подробный код для этого метода, потому что я не думаю, что он может писать файлы паркета с другой схемой.