В настоящее время мы читаем дату с использованием экземпляра календаря для выбора записи за последний месяц с использованием sparksql. Теперь нам нужно: в случае добавления дополнительных событий в предыдущий день, мы также должны иметь возможность вручную добавлять итоговые даты начала и окончания, в случае, если нам нужно вручную выполнить задание за предыдущий период времени:
например, таблица ручного запуска может выглядеть так:
rprtng_period_type_cd summary_start_date summary_end_date summary_iv
M 2018-01-01 2018-01-31 2018-01
D 2018-03-05 2018-03-05 2018-03-05
D 2018-03-27 2018-03-27 2018-03-27
Это должно указывать заданию рассчитывать ежемесячную сводку за 18 января и две ежедневные сводки, одну на 05 марта и одну на 27 марта
Задание должно занять summary_start_date
summary_end_date
и обеспечить, чтобы в вычислениях учитывались только события с event_dt
между этими двумя датами.
Мой текущий фрагмент кода выглядит так:
def execute(): Dataframe = {
//log files
val hivecntxt = SparkContextLoader.hiveContext
val eventsourceTable= cusotmermetricConstants.source_table
// Calendar information
val abc = Calendar.getInstance
abc.add(Calendar.month, -1)
var month = abc.get(Calendar.MONTH)
var year = abc.get(Calendar.YEAR)
var fileMonth = month + 1
var monthStr = if (fileMonth<=9) {
monthStr ="0" + fileMonth.toString
} else {
monthStr = fileMonth.toString
}
//testing purpose
monthStr = "11"
year = 2016
val monthlyEventDf = hiveContext.sql("select * from " + referenceDB + " ." + eventsourceTable + "where(unix_timestamp(event_Dt, "yyyy-mm"))")=unix_timestamp("' +year+ "-"+"monthstr"+"',+'yyyy-MM'))")
val uniquedf = monthlyEventDf.repartition(col("event_Id")).withColumn("rank",rank().over(Window.partitionBy("event_Id").orderBy(desc("somevalue")))
val monthlyEventfinal = monthlyEventDf.persist(StorageLevel.Memory_AND_DISK)
return monthlyEventfinal
}
Где мы можем отредактировать наше требование в текущем модуле
Ищу предложения