Предполагая, что ваш DataFrame имеет следующую схему:
df.printSchema()
#root
# |-- Name: string (nullable = true)
# |-- starttime: timestamp (nullable = true)
# |-- endtime: timestamp (nullable = true)
т.е. где starttime
и endtime
оба TimestampType()
.
Вы можете проверить если endtime
кровоточит в следующий час, сравнивая hour
части starttime
и endtime
. Если они не равны 1 , это означает, что вам нужно усечь время окончания.
from pyspark.sql.functions import col, hour
df.withColumn(
"bleeds_into_next_hour",
hour(col("endtime")) != hour(col("starttime"))
).show()
#+-----+-------------------+-------------------+---------------------+
#| Name| starttime| endtime|bleeds_into_next_hour|
#+-----+-------------------+-------------------+---------------------+
#|user1|2019-08-02 03:34:45|2019-08-02 03:52:03| false|
#|user2|2019-08-13 13:34:10|2019-08-13 14:02:10| true|
#+-----+-------------------+-------------------+---------------------+
Здесь указывается, какие строки необходимо изменить. Вы можете почти получить желаемый выходной сигнал, используя date_trunc
с параметром format
, установленным на hour
:
from pyspark.sql.functions import date_trunc, when
df.withColumn(
"bleeds_into_next_hour",
hour(col("endtime")) != hour(col("starttime"))
).withColumn(
"endtime",
when(
col("bleeds_into_next_hour"),
date_trunc('hour', "endtime")
).otherwise(col("endtime"))
).show()
#+-----+-------------------+-------------------+---------------------+
#| Name| starttime| endtime|bleeds_into_next_hour|
#+-----+-------------------+-------------------+---------------------+
#|user1|2019-08-02 03:34:45|2019-08-02 03:52:03| false|
#|user2|2019-08-13 13:34:10|2019-08-13 14:00:00| true|
#+-----+-------------------+-------------------+---------------------+
Все, что вам теперь нужно, это вычесть 1 секунду из endtime
. Самый простой способ - преобразовать unix_timestamp
, вычесть 1, а затем преобразовать обратно, используя from_unixtime
.
from pyspark.sql.functions import from_unixtime, unix_timestamp
df.withColumn(
"bleeds_into_next_hour",
hour(col("endtime")) != hour(col("starttime"))
).withColumn(
"endtime",
from_unixtime(
unix_timestamp(
when(
col("bleeds_into_next_hour"),
date_trunc('hour', "endtime")
).otherwise(col("endtime"))
) - 1
)
).drop("bleeds_into_next_hour").show()
#+-----+-------------------+-------------------+
#| Name| starttime| endtime|
#+-----+-------------------+-------------------+
#|user1|2019-08-02 03:34:45|2019-08-02 03:52:02|
#|user2|2019-08-13 13:34:10|2019-08-13 13:59:59|
#+-----+-------------------+-------------------+
Поместив все это вместе, без промежуточного столбца:
from pyspark.sql.functions import col, date_trunc, from_unixtime, hour, unix_timestamp, when
df = df.withColumn(
"endtime",
from_unixtime(
unix_timestamp(
when(
hour(col("endtime")) != hour(col("starttime")),
date_trunc('hour', "endtime")
).otherwise(col("endtime"))
) - 1
)
)
Примечания
- Предполагая, что
endtime
всегда больше или равно starttime
. Вы не можете сделать >
, потому что часы переносятся после часа 12.