Обновите значение Minute and Seconds в столбце Dataframe с помощью Pyspark - PullRequest
0 голосов
/ 05 февраля 2020

У меня есть DF следующим образом:

Name    starttime               endtime
user1   2019-08-02 03:34:45   2019-08-02 03:52:03
user2   2019-08-13 13:34:10   2019-08-13 14:02:10

Я хотел бы проверить, кровоточит ли endtime в следующий час, и если это произойдет, то обновите его до последней минуты и секунды текущего часа как показано ниже.

Name    starttime               endtime
user1   2019-08-02 03:34:45   2019-08-02 03:52:03
user2   2019-08-13 13:34:10   2019-08-13 13:59:59

Я могу выполнить проверку и заменить, как показано ниже, используя UDF, но предпочел бы не использовать их.

def adjust_end_hour(date):
    return date.replace(second=59,minute=59)
adjust_end_hour_udf = udf(adjust_end_hour, TimestampType())
df = df.\
        filter(df.endtime > adjust_end_hour_udf(df.starttime)).\
withColumn('enddtime', adjust_end_hour_udf(df.starttime))

Как я могу это сделать, не используя UDF в pyspark?

Спасибо

Ответы [ 2 ]

1 голос
/ 05 февраля 2020

Другим решением было бы усечь starttime до часа, затем добавить 59 секунд и 59 минут, используя синтаксис SQL INTERVAL, например:

adjust_expr = "date_trunc('hour', starttime) + INTERVAL 59 seconds + INTERVAL 59 minutes"

df.withColumn("endtime",
              when(col("endtime") > expr(adjust_expr),
                   expr(adjust_expr)
                  ).otherwise(col("endtime"))
              )\
  .show()

Дает:

+-----+-------------------+-------------------+
| name|          starttime|            endtime|
+-----+-------------------+-------------------+
|user1|2019-08-02 03:34:45|2019-08-02 03:52:03|
|user2|2019-08-13 13:34:10|2019-08-13 13:59:59|
+-----+-------------------+-------------------+
0 голосов
/ 05 февраля 2020

Предполагая, что ваш DataFrame имеет следующую схему:

df.printSchema()
#root
# |-- Name: string (nullable = true)
# |-- starttime: timestamp (nullable = true)
# |-- endtime: timestamp (nullable = true)

т.е. где starttime и endtime оба TimestampType().

Вы можете проверить если endtime кровоточит в следующий час, сравнивая hour части starttime и endtime. Если они не равны 1 , это означает, что вам нужно усечь время окончания.

from pyspark.sql.functions import col, hour

df.withColumn(
    "bleeds_into_next_hour", 
    hour(col("endtime")) != hour(col("starttime"))
).show()
#+-----+-------------------+-------------------+---------------------+
#| Name|          starttime|            endtime|bleeds_into_next_hour|
#+-----+-------------------+-------------------+---------------------+
#|user1|2019-08-02 03:34:45|2019-08-02 03:52:03|                false|
#|user2|2019-08-13 13:34:10|2019-08-13 14:02:10|                 true|
#+-----+-------------------+-------------------+---------------------+

Здесь указывается, какие строки необходимо изменить. Вы можете почти получить желаемый выходной сигнал, используя date_trunc с параметром format, установленным на hour:

from pyspark.sql.functions import date_trunc, when

df.withColumn(
    "bleeds_into_next_hour", 
    hour(col("endtime")) != hour(col("starttime"))
).withColumn(
    "endtime", 
    when(
        col("bleeds_into_next_hour"), 
        date_trunc('hour', "endtime")
    ).otherwise(col("endtime"))
).show()
#+-----+-------------------+-------------------+---------------------+
#| Name|          starttime|            endtime|bleeds_into_next_hour|
#+-----+-------------------+-------------------+---------------------+
#|user1|2019-08-02 03:34:45|2019-08-02 03:52:03|                false|
#|user2|2019-08-13 13:34:10|2019-08-13 14:00:00|                 true|
#+-----+-------------------+-------------------+---------------------+

Все, что вам теперь нужно, это вычесть 1 секунду из endtime. Самый простой способ - преобразовать unix_timestamp, вычесть 1, а затем преобразовать обратно, используя from_unixtime.

from pyspark.sql.functions import from_unixtime, unix_timestamp

df.withColumn(
    "bleeds_into_next_hour", 
    hour(col("endtime")) != hour(col("starttime"))
).withColumn(
    "endtime", 
    from_unixtime(
        unix_timestamp(
            when(
                col("bleeds_into_next_hour"), 
                date_trunc('hour', "endtime")
            ).otherwise(col("endtime"))
        ) - 1
    )
).drop("bleeds_into_next_hour").show()
#+-----+-------------------+-------------------+
#| Name|          starttime|            endtime|
#+-----+-------------------+-------------------+
#|user1|2019-08-02 03:34:45|2019-08-02 03:52:02|
#|user2|2019-08-13 13:34:10|2019-08-13 13:59:59|
#+-----+-------------------+-------------------+

Поместив все это вместе, без промежуточного столбца:

from pyspark.sql.functions import col, date_trunc, from_unixtime, hour, unix_timestamp, when

df = df.withColumn(
    "endtime", 
    from_unixtime(
        unix_timestamp(
            when(
                hour(col("endtime")) != hour(col("starttime")), 
                date_trunc('hour', "endtime")
            ).otherwise(col("endtime"))
        ) - 1
    )
)

Примечания

  1. Предполагая, что endtime всегда больше или равно starttime. Вы не можете сделать >, потому что часы переносятся после часа 12.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...