Получение ежемесячных данных с помощью Spark Scala - PullRequest
0 голосов
/ 27 мая 2020

Я пытаюсь извлечь данные из файла в течение месяца, а затем обработать их. Обычно мне нужно извлекать данные за каждый месяц и выполнять некоторые преобразования. Поскольку моя работа выполняется ежедневно, я хочу использовать ее и заполнять данные за этот месяц до run_date.

У меня есть два подхода:

Подход 1:

заполняет данные только за предыдущий месяц. Например, если моя current_date или run_date находится в месяце May, я буду заполнять данные за месяц April. Это может быть достигнуто путем вычитания месяца из current_date() и вычитания из него 1. Что-то похожее на приведенное ниже:

df.filter(month(to_date(col("startDate")))===month(to_date(current_date())-1))

Это всего лишь идея. Этот код не достигнет того, что я пытаюсь сделать, поскольку я вычитаю только часть месяца и не рассматриваю часть Year.

Но в этом случае моя работа будет выполняться ежедневно, чтобы заполнить те же данные для целый месяц. Нет смысла это делать.

Подход 2:

Если моя current_date - 2020-05-27, я хочу получить данные из 2020-05-01 to 2020-05-26. Если моя текущая дата - 2020-06-01, она должна содержать данные за май месяц, начиная с 2020-05-01 to 2020-05-31.

Я хочу реализовать Подход 2 . Единственная идея, которую я мог придумать, - это написать пару операторов Case, чтобы проверить даты и соответственно заполнить их.

Может кто-нибудь поделится идеей по этому поводу. Есть ли какой-нибудь немного прямой путь.

Я использую Spark 1.5

1 Ответ

1 голос
/ 27 мая 2020

Проверьте, помогает ли это -

1. Загрузите данные тестирования

val data =
      """
        |2018-04-07 07:07:17
        |2018-04-07 07:32:27
        |2018-04-07 08:36:44
        |2018-04-07 08:38:00
        |2018-04-07 08:39:29
        |2018-04-08 01:43:08
        |2018-04-08 01:43:55
        |2018-04-09 07:52:31
        |2018-04-09 07:52:42
        |2019-01-24 11:52:31
        |2019-01-24 12:52:42
        |2019-01-25 12:52:42
      """.stripMargin
    val df = spark.read
      .schema(StructType(Array(StructField("startDate", DataTypes.TimestampType))))
      .csv(data.split(System.lineSeparator()).toSeq.toDS())
    df.show(false)
    df.printSchema()

Output-


+-------------------+
|startDate          |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
|2019-01-24 11:52:31|
|2019-01-24 12:52:42|
|2019-01-25 12:52:42|
+-------------------+

root
 |-- startDate: timestamp (nullable = true)

2. Создать столбец фильтра на основе current date

    val filterCOl = (currentDate: String) =>  when(datediff(date_format(lit(currentDate), "yyyy-MM-dd")
      ,date_format(lit(currentDate), "yyyy-MM-01"))===lit(0),
     date_format(col("startDate"), "yyyy-MM") ===
       date_format(concat_ws("-",year(lit(currentDate)), month(lit(currentDate)) -1), "yyyy-MM")
    ).otherwise(to_date(col("startDate"))
     .between(date_format(lit(currentDate), "yyyy-MM-01"), lit(currentDate)))

3. Проверить, когда текущие данные находятся в промежутке между месяцами

 var currentDateStr = "2018-04-08"
    df.filter(filterCOl(currentDateStr)).show(false)

Вывод -

+-------------------+
|startDate          |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
+-------------------+

4. Проверить, когда текущие данные - это первый день месяца

currentDateStr = "2018-05-01"
    df.filter(filterCOl(currentDateStr)).show(false)

Выход-

+-------------------+
|startDate          |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
+-------------------+

...