Искра - секунды до ЧЧ / мм аа - PullRequest
0 голосов
/ 05 марта 2020

У меня есть скрипт, который в настоящее время отображает 30-минутные периоды и вычисляет среднее значение этих 30 минут.

Чтобы заставить мои окна работать так, как я хотел, мне нужно было преобразовать базовую метку времени MM/dd/yyyy HH:mm:ss aa в unix_timestamp только для часов и минут.

Текущий код:

val taxiSub = spark.read.format("csv").option("header", true).option("inferSchema", true).load("/user/zeppelin/taxi/taxi_subset.csv")
taxiSub.createOrReplaceTempView("taxiSub")
val time=taxiSub.withColumn("Pickup",from_unixtime(unix_timestamp(col(("tpep_pickup_datetime")),"MM/dd/yyyy hh:mm:ss aa"),"MM/dd/yyyy HH:mm")).withColumn("Dropoff",from_unixtime(unix_timestamp(col(("tpep_dropoff_datetime")),"MM/dd/yyyy hh:mm:ss aa"),"MM/dd/yyyy HH:mm"))
val stamp = time.withColumn("tmp",to_timestamp(col("Pickup"),"MM/dd/yyyy HH:mm"))
.withColumn("StartTimestamp", unix_timestamp(concat_ws(":",hour(col("tmp")),minute(col("tmp"))),"HH:mm")).drop("tmp")

val windowSpec = Window.orderBy("StartTimestamp").rangeBetween(-1800,Window.currentRow)
val byRange = stamp.withColumn("avgPassengers",avg(col("passenger_count")).over(windowSpec)).orderBy(desc("StartTimestamp")).withColumn("EndTime",col("StartTimestamp")+1800)
val answer = byRange.withColumn("Start",)

byRange.createOrReplaceTempView("byRangeTable")
spark.sqlContext.sql("SELECT StartTimestamp,EndTime,avg(avgPassengers) as AvgPassengers  FROM byRangeTable group by StartTimestamp,EndTime ORDER BY AvgPassengers DESC ").show(truncate=false)

Текущий вывод:

+--------------+-------+------------------+
|StartTimestamp|EndTime|AvgPassengers     |
+--------------+-------+------------------+
|28140         |29940  |2.0851063829787235|
|28200         |30000  |2.0833333333333335|
|26940         |28740  |2.0714285714285716|

Как преобразовать StartTimestamp и EndTime обратно в форму HH/mm aa.

Т.е. я пытаюсь чтобы преобразовать вышеупомянутое в:

+--------------+------------+------------------+
|StartTimestamp|EndTime     |AvgPassengers     |
+--------------+------------+------------------+
|07:49:00 am   |08:19:00 am |2.0851063829787235|
|07:50:00 am   |08:20:00 am |2.0833333333333335|
|07:29:00 am   |07:59:00 am |2.0714285714285716|

1 Ответ

1 голос
/ 05 марта 2020

Используйте функцию from_unixtime() с форматом вывода как 'hh:mm:ss a'.

Example:

spark.sql("select from_unixtime('28140','hh:mm:ss a')").show()
//+-----------+
//|        _c0|
//+-----------+
//|01:49:00 AM|
//+-----------+

For your case:

//in dataframe api
df.withColumn("StartTimestamp",from_unixtime(col("StartTimestamp"),"hh:mm:ss a")).
withColumn("EndTime",from_unixtime(col("EndTime"),"hh:mm:ss a")).show()

//in sql
sqlContext.sql("select from_unixtime(StartTimestamp,'hh:mm:ss a') as StartTimestamp,from_unixtime(EndTime,'hh:mm:ss a') as EndTime,AvgPassengers from tmp").show()

//timestamp values differ from question based on session timezone
//+--------------+-----------+------------------+
//|StartTimestamp|    EndTime|     AvgPassengers|
//+--------------+-----------+------------------+
//|   01:49:00 AM|02:19:00 AM|2.0851063829787235|
//|   01:50:00 AM|02:20:00 AM|2.0833333333333335|
//+--------------+-----------+------------------+
...