Эффективное зацикливание искрового кадра данных для обработки с сокращенным временем - PullRequest
0 голосов
/ 06 апреля 2020

Я создаю некоторые транзакции EMI на основе нескольких сравнений дат, используя Spark v2.4 и scala. Не могли бы вы предложить альтернативный способ итерации блока данных spark для повышения производительности?

Код -

//EMI Transaction Schema
val emiDFSchema = StructType(
StructField("AS_OF_DATE",StringType, false)::
StructField("EST_END_DT",StringType, false)::
StructField("EFF_DT",StringType, false)::
StructField("NET_PRCPL_AMT",DecimalType, false)::
...
...
Nil)

val rwList = new ListBuffer[Row]()
inputDF.collect().map { row =>
    EST_END_DT = row.getAs[Timestamp]("EST_END_DT").toLocalTime().toLocalDate()
    EFF_DT = row.getAs[Timestamp]("EFF_DT").toLocalTime().toLocalDate()
    AS_OF_DATE = row.getAs[Timestamp]("AS_OF_DATE").toLocalTime().toLocalDate()

   val daysDiff = ChronoUnit.DAYS.Between(AS_OF_DATE,EST_END_DT)

   getPymtDates(daysDiff,pymt_freq) //--> Returns a Map(pymt_no,pymt_dt)

  INTRST = NET_PRCPL_AMT * RATE
  .....
  .....
  val row = Row(EST_END_DT,EFF_DT,PYMT_DT,NET_PRCPL_AMT,INTRST,......<all calculated attributes>)
  rwList+=(row)
  }

 val emiDF = spark.createDataFrame(spark.sparkContext.parallelize(rwList),emiDFSchema)

Приведенный выше код циклически повторяется, если количество записей> 2L. Я считаю, что это связано с использованием collect (). Не могли бы вы предложить лучшую альтернативу для структуры для расчета дат платежей и транзакций EMI, как указано выше?

...