Я работаю над усреднением некоторых значений температуры в Scala и пытаюсь найти самую холодную температуру для каждого часового периода. Затем, какой бы период ни имел самую холодную температуру и наибольшее количество вхождений во всем наборе данных, я хочу выбрать это время.
Кажется, мой код выполняет мои вычисления и корректно работает с окнами, единственная проблема, которую я имею, это необходимо иметь возможность правильно выводить окно синхронизации. Приведенный ниже код извлекает отдельные столбцы из моего входного файла (месяц, день, год, время, все разбитые на отдельные столбцы) и объединяет их в один столбец.
Мой код
val data = osh.select(col("TemperatureF"), concat(format_string("%02d",col("Month")),lit("/"),format_string("%02d",col("Day")),lit("/"),col("Year"),lit(" "),col("TimeCST")).as("Date")).show()
Пример ввода из приведенного выше кода:
+------------+-------------------+
|TemperatureF| Date|
+------------+-------------------+
| 35.1|01/01/2000 12:53 AM|
| 35.1| 01/01/2000 1:53 AM|
| 35.1| 01/01/2000 2:53 AM|
| 34.0| 01/01/2000 3:53 AM|
| 32.0| 01/01/2000 4:53 AM|
| 30.9| 01/01/2000 5:53 AM|
| 28.0| 01/01/2000 6:53 AM|
+------------+-------------------+
При обработке этого ввода я пытаюсь преобразовать его из строки (поскольку она объединена) во временную метку в форме: MM/dd/yyyy hh:mm a
. Мой код в настоящее время, кажется, конвертирует мою метку времени в 24-часовой масштаб. Из прочитанного мною чтения hh
вместо HH
должно означать 12-часовой промежуток, а a
должен добавить AM / PM. Любые предложения, как изменить эту проблему?
Полный код:
val data = osh.select(col("TemperatureF"), concat(format_string("%02d",col("Month")),lit("/"),format_string("%02d",col("Day")),lit("/"),col("Year"),lit(" "),col("TimeCST")).as("Date")).show()
val ts = to_timestamp($"Date","MM/dd/yyyy hh:mm a")
val mydata=data.withColumn("ts",ts).show()
val groupByWindow = mydata.groupBy(window(col("ts"), "1 hour")).agg(avg("TemperatureF").as("avgTemp")).select("window.start", "window.end", "avgTemp").show()
val daily = groupByWindow.withColumn("_tmp",split($"start"," ")).select($"_tmp".getItem(0).as("Date"),$"_tmp".getItem(1).as("StartTime"),$"end",$"avgTemp").withColumn("_tmp2",split($"end"," ")).select($"Date",$"StartTime",$"_tmp2".getItem(1).as("EndTime"),$"avgTemp")
daily.createOrReplaceTempView("myview")
spark.sqlContext.sql("Select StartTime,EndTime,avg(avgTemp) avgTemp,count(*) Total from myview group by StartTime,EndTime order by avgTemp ASC, total DESC").show()
Токовый выход:
+---------+--------+-------------------+-----+
|StartTime| EndTime| avgTemp|Total|
+---------+--------+-------------------+-----+
| 10:00:00|11:00:00|-16.314026481823376| 5726|
| 11:00:00|12:00:00|-3.8934910974897816| 5710|
| 09:00:00|10:00:00| 22.41515848657947| 5702|
| 23:00:00|00:00:00| 34.76578133360086| 5696|
+---------+--------+-------------------+-----+
Ожидаемый выход:
+---------+--------+-------------------+-----+
|StartTime| EndTime| avgTemp|Total|
+---------+--------+-------------------+-----+
| 10:00 AM|11:00 AM|-16.314026481823376| 5726|
| 11:00 AM|12:00 PM|-3.8934910974897816| 5710|
| 09:00 AM|10:00 AM| 22.41515848657947| 5702|
| 23:00 PM|12:00 AM| 34.76578133360086| 5696|
+---------+--------+-------------------+-----+