Мне нужно сгенерировать 3000000 файлов в качестве выходных данных задания на работу.
У меня есть два входных файла:
File 1 -> Size=3.3 Compressed, No.Of Records=13979835
File 2 -> Size=1.g Compressed, No.Of Records=6170229
Spark Job выполняет следующие действия:
- чтение и этого файла и присоединение к ним на основе общего столбца1.-> DataFrame-A
- Группировка результата DataFrame-A на основе одного столбца2 -> DataFrame-B
- Из DataFrame-B использовала array_join для агрегированного столбца и отделить этот столбец от '\ n'char.-> DataFrame-C
Запись результата разбиения DataFrame-C по столбцу 2.
val DF1 = sparkSession.read.json("FILE1") // |ID |isHighway|isRamp|pvId |linkIdx|ffs |length |
val DF12 = sparkSession.read.json("FILE2") // |lId |pid |
val joinExpression = DF1.col("pvId") === DF2.col("lId")
val DFA = DF.join(tpLinkDF, joinExpression, "inner").select(col("ID").as("SCAR"), col("lId"), col("length"), col("ffs"), col("ar"), col("pid")).orderBy("linkIdx")
val DFB = DFA.select(col("SCAR"),concat_ws(",", col("lId"), col("length"),col("ffs"), col("ar"), col("pid")).as("links")).groupBy("SCAR").agg(collect_list("links").as("links"))
val DFC = DFB.select(col("SCAR"), array_join(col("links"), "\n").as("links"))
DFC.write.format("com.databricks.spark.csv").option("quote", "\u0000").partitionBy("SCAR").mode(SaveMode.Append).format("csv").save("/tmp")
Мне нужно сгенерировать 3000000 файлов в качестве вывода задания на искру.