Если вы используете Spark Verson 1.5+, вы можете использовать pyspark.sql.functions.second()
, чтобы получить секунды из вашего столбца меток времени.
import pyspark.sql.functions as f
df.withColumn("second", f.second("old_timestamp")).show()
#+-------------------+------+
#| old_timestamp|second|
#+-------------------+------+
#|2016-02-09 19:31:02| 2|
#|2016-02-09 19:31:35| 35|
#|2016-02-09 19:31:52| 52|
#|2016-02-09 19:31:28| 28|
#+-------------------+------+
После того, как вы получите часть секунд, выможно взять это число, разделить на 30, округлить и умножить на 30, чтобы получить «новую» секунду.
df.withColumn("second", f.second("old_timestamp"))\
.withColumn("new_second", f.round(f.col("second")/30)*30)\
.show()
#+-------------------+------+----------+
#| old_timestamp|second|new_second|
#+-------------------+------+----------+
#|2016-02-09 19:31:02| 2| 0.0|
#|2016-02-09 19:31:35| 35| 30.0|
#|2016-02-09 19:31:52| 52| 60.0|
#|2016-02-09 19:31:28| 28| 30.0|
#+-------------------+------+----------+
Из «новой» секунды мы можем вычислить смещение в секундах, которое придобавление к исходной отметке времени приведет к получению желаемых «округленных» отметок времени.
df.withColumn("second", f.second("old_timestamp"))\
.withColumn("new_second", f.round(f.col("second")/30)*30)\
.withColumn("add_seconds", f.col("new_second") - f.col("second"))\
.show()
#+-------------------+------+----------+-----------+
#| old_timestamp|second|new_second|add_seconds|
#+-------------------+------+----------+-----------+
#|2016-02-09 19:31:02| 2| 0.0| -2.0|
#|2016-02-09 19:31:35| 35| 30.0| -5.0|
#|2016-02-09 19:31:52| 52| 60.0| 8.0|
#|2016-02-09 19:31:28| 28| 30.0| 2.0|
#+-------------------+------+----------+-----------+
Как мы видим, отрицательное число в этом столбце означает, что исходное время должно быть округлено в меньшую сторону.Положительное число увеличит время.
Чтобы добавить это время к исходной метке времени, сначала преобразуйте ее в метку времени Unix, используя pyspark.sql.functions.unix_timestamp()
.После добавления преобразуйте результат обратно во временную метку, используя pyspark.sql.functions.from_unixtime()
.
Соберите все это вместе (сжимая промежуточные шаги):
df.withColumn(
"add_seconds",
(f.round(f.second("old_timestamp")/30)*30) - f.second("old_timestamp")
)\
.withColumn(
"new_timestamp",
f.from_unixtime(f.unix_timestamp("old_timestamp") + f.col("add_seconds"))
)\
.drop("add_seconds")\
.show()
#+-------------------+-------------------+
#| old_timestamp| new_timestamp|
#+-------------------+-------------------+
#|2016-02-09 19:31:02|2016-02-09 19:31:00|
#|2016-02-09 19:31:35|2016-02-09 19:31:30|
#|2016-02-09 19:31:52|2016-02-09 19:32:00|
#|2016-02-09 19:31:28|2016-02-09 19:31:30|
#+-------------------+-------------------+