Как проверить данные истории? - PullRequest
0 голосов
/ 03 июля 2018

В настоящее время мы читаем дату с использованием экземпляра календаря для выбора записи за последний месяц с использованием sparksql. Теперь нам нужно: в случае добавления дополнительных событий в предыдущий день, мы также должны иметь возможность вручную добавлять итоговые даты начала и окончания, в случае, если нам нужно вручную выполнить задание за предыдущий период времени: например, таблица ручного запуска может выглядеть так:

rprtng_period_type_cd  summary_start_date summary_end_date  summary_iv
M                         2018-01-01      2018-01-31        2018-01
D                         2018-03-05      2018-03-05        2018-03-05
D                         2018-03-27      2018-03-27        2018-03-27

Это должно указывать заданию рассчитывать ежемесячную сводку за 18 января и две ежедневные сводки, одну на 05 марта и одну на 27 марта

Задание должно занять summary_start_date summary_end_date и обеспечить, чтобы в вычислениях учитывались только события с event_dt между этими двумя датами.

Мой текущий фрагмент кода выглядит так:

def execute(): Dataframe = {
  //log files
  val hivecntxt = SparkContextLoader.hiveContext
  val eventsourceTable= cusotmermetricConstants.source_table

  // Calendar information
  val abc = Calendar.getInstance
  abc.add(Calendar.month, -1)
  var month = abc.get(Calendar.MONTH)
  var year = abc.get(Calendar.YEAR)
  var fileMonth = month + 1
  var monthStr = if (fileMonth<=9) {
    monthStr ="0" + fileMonth.toString
  } else {
    monthStr = fileMonth.toString
  }

  //testing purpose
  monthStr = "11"
  year = 2016
  val monthlyEventDf = hiveContext.sql("select * from " + referenceDB + " ." + eventsourceTable + "where(unix_timestamp(event_Dt, "yyyy-mm"))")=unix_timestamp("' +year+ "-"+"monthstr"+"',+'yyyy-MM'))")
  val uniquedf = monthlyEventDf.repartition(col("event_Id")).withColumn("rank",rank().over(Window.partitionBy("event_Id").orderBy(desc("somevalue")))
  val monthlyEventfinal = monthlyEventDf.persist(StorageLevel.Memory_AND_DISK)

  return monthlyEventfinal
}

Где мы можем отредактировать наше требование в текущем модуле Ищу предложения

1 Ответ

0 голосов
/ 03 июля 2018

Вы можете использовать функцию filter для выбора записей в диапазоне, как показано ниже

//Input df

+---+----------+----------+
| cd|start_date|  end_date|
+---+----------+----------+
|  M|2018-01-01|2018-01-31|
|  D|2018-05-03|2018-05-03|
|  D|2018-03-27|2018-03-27|
+---+----------+----------+

//Parameter startDate and endDate
val endDate="2018-05-03"

val endDate="2018-05-03"

//Filter condition
df.filter(s"start_date>='$startDate' and end_date<='$endDate'").show

//Sample Output: 
+---+----------+----------+
| cd|start_date|  end_date|
+---+----------+----------+
|  D|2018-05-03|2018-05-03|
|  D|2018-03-27|2018-03-27|
+---+----------+----------+

Надеюсь, это поможет вам. Если вы хотите выполнить какие-либо вычисления для отфильтрованных записей, вам нужно передать столбцы в udf

...