Spark SQL + Date операции - PullRequest
       6

Spark SQL + Date операции

0 голосов
/ 30 августа 2018

Я хочу понять, как лучше всего решать проблемы, связанные с датами, в spark SQL. Я пытаюсь решить простую проблему, где у меня есть файл с диапазонами дат, как показано ниже:

startdate,enddate
01/01/2018,30/01/2018
01/02/2018,28/02/2018
01/03/2018,30/03/2018

и другая таблица с датой и количеством:

date,counts
03/01/2018,10
25/01/2018,15
05/02/2018,23
17/02/2018,43

Теперь все, что я хочу найти, это сумма отсчетов для каждого диапазона дат, поэтому ожидаемый результат:

startdate,enddate,sum(count)
01/01/2018,30/01/2018,25
01/02/2018,28/02/2018,66
01/03/2018,30/03/2018,0

Ниже приведен код, который я написал, но он дает мне декартово множество результатов:

val spark = SparkSession.builder().appName("DateBasedCount").master("local").getOrCreate()
import spark.implicits._

val df1 = spark.read.option("header","true").csv("dateRange.txt").toDF("startdate","enddate")
val df2 = spark.read.option("header","true").csv("dateCount").toDF("date","count")

df1.createOrReplaceTempView("daterange")
df2.createOrReplaceTempView("datecount")

val res = spark.sql("select startdate,enddate,date,visitors from daterange left join datecount on date >= startdate and date <= enddate")
res.rdd.foreach(println)

Вывод:

| startdate|   enddate|      date|visitors|
|01/01/2018|30/01/2018|03/01/2018|      10|
|01/01/2018|30/01/2018|25/01/2018|      15|
|01/01/2018|30/01/2018|05/02/2018|      23|
|01/01/2018|30/01/2018|17/02/2018|      43|
|01/02/2018|28/02/2018|03/01/2018|      10|
|01/02/2018|28/02/2018|25/01/2018|      15|
|01/02/2018|28/02/2018|05/02/2018|      23|
|01/02/2018|28/02/2018|17/02/2018|      43|
|01/03/2018|30/03/2018|03/01/2018|      10|
|01/03/2018|30/03/2018|25/01/2018|      15|
|01/03/2018|30/03/2018|05/02/2018|      23|
|01/03/2018|30/03/2018|17/02/2018|      43|

Теперь, если у меня groupby начальная дата и конечная дата с суммой на счету, я вижу следующий неверный результат:

| startdate|   enddate| sum(count)|
|01/01/2018|30/01/2018|       91.0|
|01/02/2018|28/02/2018|       91.0|
|01/03/2018|30/03/2018|       91.0|

Итак, как мы справимся с этим и как лучше всего обращаться с датами в Spark SQL? Должны ли мы строить столбцы как dateType на первом месте ИЛИ читать как строки, а затем при необходимости приводить их к дате?

1 Ответ

0 голосов
/ 30 августа 2018

Проблема в том, что ваши даты не интерпретируются Spark как даты автоматически, они просто строки. Поэтому решение состоит в том, чтобы преобразовать их в даты:

val df1 = spark.read.option("header","true").csv("dateRange.txt")
  .toDF("startdate","enddate")
  .withColumn("startdate", to_date(unix_timestamp($"startdate", "dd/MM/yyyy").cast("timestamp")))
  .withColumn("enddate", to_date(unix_timestamp($"enddate", "dd/MM/yyyy").cast("timestamp")))
val df2 = spark.read.option("header","true").csv("dateCount")
  .toDF("date","count")
  .withColumn("date", to_date(unix_timestamp($"date", "dd/MM/yyyy").cast("timestamp")))

Затем используйте тот же код, что и раньше. Вывод команды SQL теперь:

+----------+----------+----------+------+
| startdate|   enddate|      date|counts|
+----------+----------+----------+------+
|2018-01-01|2018-01-30|2018-01-03|    10|
|2018-01-01|2018-01-30|2018-01-25|    15|
|2018-02-01|2018-02-28|2018-02-05|    23|
|2018-02-01|2018-02-28|2018-02-17|    43|
|2018-03-01|2018-03-30|      null|  null|
+----------+----------+----------+------+

Если последнюю строку следует игнорировать, просто перейдите на внутреннее соединение.

Использование df.groupBy("startdate", "enddate").sum() на этом новом фрейме данных даст желаемый результат.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...