Как мне использовать flatmap с несколькими столбцами в Dataframe, используя Pyspark - PullRequest
0 голосов
/ 05 февраля 2020

У меня есть DF, как показано ниже:

Name    city       starttime               endtime
user1   London      2019-08-02 03:34:45   2019-08-02 03:52:03
user2   Boston      2019-08-13 13:34:10   2019-08-13 15:02:10

Я хотел бы проверить endtime, и если он перейдет в следующий час, обновите текущую запись с последней минутой / секундой текущего часа и добавьте еще одну строку или строки с аналогичными данными, как показано ниже (user2). Я использую flapmap или конвертирую DF в RDD и использую карту или есть другой способ?

Name    city     starttime               endtime
user1   London   2019-08-02 03:34:45   2019-08-02 03:52:03
user2   Boston   2019-08-13 13:34:10   2019-08-13 13:59:59
user2   Boston   2019-08-13 14:00:00   2019-08-13 14:59:59
user2   Boston   2019-08-13 15:00:00   2019-08-13 15:02:10

Спасибо

1 Ответ

1 голос
/ 05 февраля 2020
 >>> from pyspark.sql.functions  import *
 >>> df.show()
    +-----+------+-------------------+-------------------+
    | Name|  city|          starttime|            endtime|
    +-----+------+-------------------+-------------------+
    |user1|London|2019-08-02 03:34:45|2019-08-02 03:52:03|
    |user2|Boston|2019-08-13 13:34:10|2019-08-13 15:02:10|
    +-----+------+-------------------+-------------------+

>>> df1 = df.withColumn("diff", ((hour(col("endtime")) - hour(col("starttime")))).cast("Int"))
            .withColumn("loop", expr("split(repeat(':', diff),':')"))
            .select(col("*"), posexplode(col("loop")).alias("pos", "value"))
            .drop("value", "loop")

>>> df1.withColumn("starttime", when(col("pos") == 0, col("starttime")).otherwise(from_unixtime(unix_timestamp(col("starttime")) + (col("pos") * 3600) - minute(col("starttime"))*60 - second(col("starttime")))))
       .withColumn("endtime", when((col("diff") - col("pos")) == 0, col("endtime")).otherwise(from_unixtime(unix_timestamp(col("endtime")) - ((col("diff") - col("pos")) * 3600) - minute(col("endtime"))*60 - second(col("endtime")) + lit(59) * lit(60) + lit(59))))
       .drop("diff", "pos")
       .show()
    +-----+------+-------------------+-------------------+
    | Name|  city|          starttime|            endtime|
    +-----+------+-------------------+-------------------+
    |user1|London|2019-08-02 03:34:45|2019-08-02 03:52:03|
    |user2|Boston|2019-08-13 13:34:10|2019-08-13 13:59:59|
    |user2|Boston|2019-08-13 14:00:00|2019-08-13 14:59:59|
    |user2|Boston|2019-08-13 15:00:00|2019-08-13 15:02:10|
    +-----+------+-------------------+-------------------+
...