У меня есть огромный фрейм данных с 20 миллионами записей. Мне нужно перебрать фрейм данных df1 и прочитать каждую строку одну за другой и построить два других фрейма данных df2 и df3 в качестве выходных данных на основе значений столбца df1.
Ввод - df1 имеет 20 столбцов и 20 миллионов записей. Вывод -df2 имеет 4 столбца и создаст 20 миллионов записей на основе значений столбцов в df1. Вывод - df3 имеет 20 столбцов и создаст 500-800 миллионов записей на основе значений столбцов в df1.
Текущий подход-
//EMI Transaction Schema
val emiDFSchema = StructType(
StructField("AS_OF_DATE",StringType, false)::
StructField("EST_END_DT",StringType, false)::
StructField("EFF_DT",StringType, false)::
StructField("NET_PRCPL_AMT",DecimalType, false)::
...
...
Nil)
val emiDFSchema1 = StructType(
StructField("AS_OF_DATE",StringType, false)::
StructField("EST_END_DT",StringType, false)::
StructField("EFF_DT",StringType, false)::
StructField("NET_PRCPL_AMT",DecimalType, false)::
...
...
Nil)
val rwList = new ListBuffer[Row]()
val rwList1 = new ListBuffer[Row]()
df1.collect().map{row=>
.....
//calculation of attributes based on columns of df1
......
//creating Row object for df2 with all calculated attributes
val row = Row(attr1,attr2,....attr20)
rwList+=(row)
for(i<-1 to n){
...
//calculation of attributes based on columns of df1
...
//creating Row object for df3 with all calculated attributes
val row1 = Row(attr1,attr2,....attr20)
rwList1+=(row1)
}
}
val emiDF1 = spark.createDataFrame(spark.sparkContext.parallelize(rwList),emiSchema)
val emiDF2 = spark.createDataFrame(spark.sparkContext.parallelize(rwList1),emiSchema1)
Поскольку df1 огромен, выполняется collect (). Карта на это занимает огромное количество времени. Не могли бы вы предложить альтернативный способ эффективной итерации df1 за меньшее время?
--- Spark v2.4 --- Scala