Искра - 30 минут Generi c Windowing - PullRequest
1 голос
/ 05 марта 2020

В настоящее время я работаю над сценарием спарк, чтобы просмотреть каждый 30-минутный период и определить среднее значение столбца за этот 30-минутный период прокатки.

Формат моей метки времени имеет вид: MM/dd/yyyy HH:mm:ss AM/PM. По сути, я смотрю на каждые 30 минут, не включая даты. (Т.е. средние пассажиры за все дни с 13:02 до 13:32).

Мой текущий скрипт возьмет мою метку времени, преобразует ее в unix метку времени и сохранит ее как новый столбец. Затем, глядя на текущую метку времени, она вычтет 900 секунд и прибавит 900 секунд, чтобы получить записи за предыдущие 15 минут и записи через 15 минут после текущей метки времени. Это даст мне 30-минутное окно, которое я ищу. Это работает, когда я включаю MM/dd/yyyy при создании моего нового столбца 'timestamp':

val taxiSub = spark.read.format("csv").option("header", true).option("inferSchema", true).load("/user/zeppelin/taxi/taxi_subset.csv")
taxiSub.createOrReplaceTempView("taxiSub")
val stamp = taxiSub.withColumn("timestamp", unix_timestamp($"tpep_pickup_datetime", "MM/dd/yyyy HH:mm"))
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("VendorID").orderBy("timestamp").rangeBetween(-900,900)
val answer = stamp.withColumn("AvgPassenger", avg(stamp("passenger_count")).over(windowSpec))
answer.select("VendorID", "tpep_pickup_datetime", "timestamp", "passenger_count", "AvgPassenger")
answer.createOrReplaceTempView("answerTable")
spark.sqlContext.sql("SELECT timestamp, AvgPassenger FROM answerTable ORDER BY AvgPassenger DESC limit 10").show()

Однако это дает мне определенные c даты, включенные в мой диапазон вместо общего c периода времени упомянутый выше. Когда я пытаюсь удалить MM/dd/yyyy из моего поколения меток времени, все мои значения меток времени становятся нулевыми. Кроме того, как я могу учесть AM / PM части моей временной метки?

Будем благодарны за любые мысли.

1 Ответ

1 голос
/ 05 марта 2020

Мы можем использовать unix_timestamp("HH:mm","HH:mm"), чтобы получить общее c значение времени эпохи, а затем использовать это значение в нашем предложении orderBy.

Example:

//import org.apache.spark.sql.expressions._

//sample data
//+--------+---------+---------------+--------------------+
//|VendorID|timestamp|passenger_count|tpep_pickup_datetime|
//+--------+---------+---------------+--------------------+
//|       1|    66180|              3|    12/12/2019 12:23|
//|       1|    66780|              2|    12/13/2018 12:33|
//|       2|    66180|             12|    12/13/2019 12:23|
//|       2|    69780|             13|    12/13/2018 13:23|
//+--------+---------+---------------+--------------------+

val stamp = taxiSub.withColumn("tmp",to_timestamp(col("tpep_pickup_datetime"),"MM/dd/yyyy HH:mm")).//add new timestamp type field
withColumn("timestamp", unix_timestamp(concat_ws(":",hour(col("tmp")),minute(col("tmp"))),"HH:mm")). //extract hour,minute and convert to epoch timestamp value
drop("tmp")

//partition based on vendorid
val windowSpec = Window.partitionBy("VendorID").orderBy("timestamp").rangeBetween(-900,900)

stamp.withColumn("AvgPassenger", avg(stamp("passenger_count")).over(windowSpec)).show()

//+--------+---------+---------------+--------------------+------------+
//|VendorID|timestamp|passenger_count|tpep_pickup_datetime|AvgPassenger|
//+--------+---------+---------------+--------------------+------------+
//|       1|    66180|              3|    12/12/2019 12:23|         2.5|
//|       1|    66780|              2|    12/13/2018 12:33|         2.5|
//|       2|    66180|             12|    12/13/2019 12:23|        12.0|
//|       2|    69780|             13|    12/13/2018 13:23|        13.0|
//+--------+---------+---------------+--------------------+------------+
...