Spark Scala - строка с меткой времени - PullRequest
0 голосов
/ 09 апреля 2020

Я работаю над усреднением некоторых значений температуры в Scala и пытаюсь найти самую холодную температуру для каждого часового периода. Затем, какой бы период ни имел самую холодную температуру и наибольшее количество вхождений во всем наборе данных, я хочу выбрать это время.

Кажется, мой код выполняет мои вычисления и корректно работает с окнами, единственная проблема, которую я имею, это необходимо иметь возможность правильно выводить окно синхронизации. Приведенный ниже код извлекает отдельные столбцы из моего входного файла (месяц, день, год, время, все разбитые на отдельные столбцы) и объединяет их в один столбец.

Мой код

val data = osh.select(col("TemperatureF"), concat(format_string("%02d",col("Month")),lit("/"),format_string("%02d",col("Day")),lit("/"),col("Year"),lit(" "),col("TimeCST")).as("Date")).show()

Пример ввода из приведенного выше кода:

+------------+-------------------+
|TemperatureF|               Date|
+------------+-------------------+
|        35.1|01/01/2000 12:53 AM|
|        35.1| 01/01/2000 1:53 AM|
|        35.1| 01/01/2000 2:53 AM|
|        34.0| 01/01/2000 3:53 AM|
|        32.0| 01/01/2000 4:53 AM|
|        30.9| 01/01/2000 5:53 AM|
|        28.0| 01/01/2000 6:53 AM|
+------------+-------------------+

При обработке этого ввода я пытаюсь преобразовать его из строки (поскольку она объединена) во временную метку в форме: MM/dd/yyyy hh:mm a. Мой код в настоящее время, кажется, конвертирует мою метку времени в 24-часовой масштаб. Из прочитанного мною чтения hh вместо HH должно означать 12-часовой промежуток, а a должен добавить AM / PM. Любые предложения, как изменить эту проблему?

Полный код:

val data = osh.select(col("TemperatureF"), concat(format_string("%02d",col("Month")),lit("/"),format_string("%02d",col("Day")),lit("/"),col("Year"),lit(" "),col("TimeCST")).as("Date")).show()
val ts = to_timestamp($"Date","MM/dd/yyyy hh:mm a")
val mydata=data.withColumn("ts",ts).show()

val groupByWindow = mydata.groupBy(window(col("ts"), "1 hour")).agg(avg("TemperatureF").as("avgTemp")).select("window.start", "window.end", "avgTemp").show()

val daily = groupByWindow.withColumn("_tmp",split($"start"," ")).select($"_tmp".getItem(0).as("Date"),$"_tmp".getItem(1).as("StartTime"),$"end",$"avgTemp").withColumn("_tmp2",split($"end"," ")).select($"Date",$"StartTime",$"_tmp2".getItem(1).as("EndTime"),$"avgTemp")

daily.createOrReplaceTempView("myview")


spark.sqlContext.sql("Select StartTime,EndTime,avg(avgTemp) avgTemp,count(*) Total from myview group by StartTime,EndTime order by avgTemp ASC, total DESC").show()

Токовый выход:

+---------+--------+-------------------+-----+
|StartTime| EndTime|            avgTemp|Total|
+---------+--------+-------------------+-----+
| 10:00:00|11:00:00|-16.314026481823376| 5726|
| 11:00:00|12:00:00|-3.8934910974897816| 5710|
| 09:00:00|10:00:00|  22.41515848657947| 5702|
| 23:00:00|00:00:00|  34.76578133360086| 5696|
+---------+--------+-------------------+-----+

Ожидаемый выход:

+---------+--------+-------------------+-----+
|StartTime| EndTime|            avgTemp|Total|
+---------+--------+-------------------+-----+
| 10:00 AM|11:00 AM|-16.314026481823376| 5726|
| 11:00 AM|12:00 PM|-3.8934910974897816| 5710|
| 09:00 AM|10:00 AM|  22.41515848657947| 5702|
| 23:00 PM|12:00 AM|  34.76578133360086| 5696|
+---------+--------+-------------------+-----+

Ответы [ 2 ]

1 голос
/ 09 апреля 2020

Использование date_format() - решит вашу проблему

scala> val daily = groupByWindow.withColumn("_tmp",split($"start"," ")).select($"_tmp".getItem(0).as("Date"),date_format($"_tmp".getItem(1),"hh:mm:ss a").as("startTime"),$"end",$"avgTemp").withColumn("_tmp2",spl it($"end"," ")).select($"Date",$"StartTime",date_format($"_tmp2".getItem(1),"hh:mm:ss a").as("EndTime"),$"avgTemp")
scala> daily.show
+----------+-----------+-----------+-------+
|      Date|  StartTime|    EndTime|avgTemp|
+----------+-----------+-----------+-------+
|2000-01-01|01:30:00 AM|02:30:00 AM|   35.1|
|2000-01-01|03:30:00 AM|04:30:00 AM|   34.0|
|2000-01-01|02:30:00 AM|03:30:00 AM|   35.1|
|2000-01-01|06:30:00 AM|07:30:00 AM|   28.0|
|2000-01-01|12:30:00 AM|01:30:00 AM|   35.1|
|2000-01-01|05:30:00 AM|06:30:00 AM|   30.9|
|2000-01-01|04:30:00 AM|05:30:00 AM|   32.0|
+----------+-----------+-----------+-------+
1 голос
/ 09 апреля 2020

Вам необходимо преобразовать String StartDate & EndDate за день в метку времени, а затем снова в String с требуемым форматом.

Las 4 с методамиColumns, добавленными к исходному коду

  val daily = groupByWindow.withColumn("_tmp",split($"start"," ")).select($"_tmp".getItem(0).as("Date"),$"_tmp".getItem(1).as("StartTime"),$"end",$"avgTemp").withColumn("_tmp2",split($"end"," ")).select($"Date",$"StartTime",$"_tmp2".getItem(1).as("EndTime"),$"avgTemp")
              .withColumn("StartTime", unix_timestamp($"StartTime", "HH:mm:ss"))
              .withColumn("StartTime", from_unixtime($"StartTime", "HH:mm a"))
              .withColumn("EndTime", unix_timestamp($"EndTime", "HH:mm:ss"))
              .withColumn("EndTime", from_unixtime($"EndTime", "HH:mm a"))

А затем следующие шаги, которые у вас уже есть:

daily.createOrReplaceTempView("myview")

spark.sqlContext.sql("Select StartTime,EndTime,avg(avgTemp) avgTemp,count(*) Total from myview group by StartTime,EndTime order by avgTemp ASC, total DESC")

Вывод

+---------+--------+-------+-----+
|StartTime| EndTime|avgTemp|Total|
+---------+--------+-------+-----+
| 06:00 AM|07:00 AM|   28.0|    1|
| 05:00 AM|06:00 AM|   30.9|    1|
| 04:00 AM|05:00 AM|   32.0|    1|
| 03:00 AM|04:00 AM|   34.0|    1|
| 00:00 AM|01:00 AM|   35.1|    1|
| 01:00 AM|02:00 AM|   35.1|    1|
| 02:00 AM|03:00 AM|   35.1|    1|
+---------+--------+-------+-----+
...