Spark: рассчитывает на окно, не работающее в течение миллисекунды - PullRequest
0 голосов
/ 08 июня 2018

Вы можете создать окно для подсчета количества повторений записи за последние 7 дней.Однако, если вы попытаетесь посмотреть, сколько раз запись происходила на миллисекундном уровне, она ломается.

Короче говоря, приведенная ниже функция df.timestamp.astype('Timestamp').cast("long") преобразует только временную метку до размеравторой долго.Это игнорирует миллисекунду.Как превратить всю метку времени, включая миллисекунды, в длинную.Вам нужно, чтобы значение было длинным, чтобы оно работало с окном.

from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import unix_timestamp

df = sqlContext.createDataFrame([
        ("a", "u8u", "2018-02-02 05:46:41.438357"),
        ("a", "u8u", "2018-02-02 05:46:41.439377"),
        ("a", "a3a", "2018-02-02 09:48:34.081818"),
        ("a", "a3a", "2018-02-02 09:48:34.095586"),
        ("a", "g8g", "2018-02-02 09:48:56.006206"),
        ("a", "g8g", "2018-02-02 09:48:56.007974"),
        ("a", "9k9", "2018-02-02 12:50:48.000000"),
        ("a", "9k9", "2018-02-02 12:50:48.100000"),
], ["person_id", "session_id", "timestamp"])


df = df.withColumn('unix_ts',df.timestamp.astype('Timestamp').cast("long"))
df = df.withColumn("DayOfWeek",F.date_format(df.timestamp, 'EEEE'))

w = Window.partitionBy('person_id','DayOfWeek').orderBy('unix_ts').rangeBetween(-86400*7,-1)
df = df.withColumn('count',F.count('unix_ts').over(w))
df.sort(df.unix_ts).show(20,False)


+---------+----------+--------------------------+----------+---------+-----+
|person_id|session_id|timestamp                 |unix_ts   |DayOfWeek|count|
+---------+----------+--------------------------+----------+---------+-----+
|a        |u8u       |2018-02-02 05:46:41.438357|1517572001|Friday   |0    |
|a        |u8u       |2018-02-02 05:46:41.439377|1517572001|Friday   |0    |
|a        |a3a       |2018-02-02 09:48:34.081818|1517586514|Friday   |2    |
|a        |a3a       |2018-02-02 09:48:34.095586|1517586514|Friday   |2    |
|a        |g8g       |2018-02-02 09:48:56.006206|1517586536|Friday   |4    |
|a        |g8g       |2018-02-02 09:48:56.007974|1517586536|Friday   |4    |
|a        |9k9       |2018-02-02 12:50:48.000000|1517597448|Friday   |6    |
|a        |9k9       |2018-02-02 12:50:48.100000|1517597448|Friday   |6    |
+---------+----------+--------------------------+----------+---------+-----+

Количество должно быть 0,1,2,3,4,5 ... вместо 0,0,2,2,4,4, ...

1 Ответ

0 голосов
/ 08 июня 2018

Вы можете использовать pyspark.sql.functions.unix_timestamp() для преобразования строкового столбца во временную метку вместо приведения к long.

import pyspark.sql.functions as F
df.select(
    "timestamp",
    F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("unix_ts")
).show(truncate=False)
#+--------------------------+----------+
#|timestamp                 |unix_ts   |
#+--------------------------+----------+
#|2018-02-02 05:46:41.438357|1517568839|
#|2018-02-02 05:46:41.439377|1517568840|
#|2018-02-02 09:48:34.081818|1517582995|
#|2018-02-02 09:48:34.095586|1517583009|
#|2018-02-02 09:48:56.006206|1517582942|
#|2018-02-02 09:48:56.007974|1517582943|
#|2018-02-02 12:50:48.862644|1517594710|
#|2018-02-02 12:50:49.981848|1517594830|
#+--------------------------+----------+

Второй аргумент unix_timestamp() - это форматстрока.В вашем случае используйте "yyyy-MM-dd HH:mm:ss.SSSSSS".


Соответствующее изменение, примененное к вашему коду:

df = df.withColumn(
    'unix_ts',
    F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss.SSSSSS")
)
df = df.withColumn("DayOfWeek", F.date_format(df.timestamp, 'EEEE'))

w = Window.partitionBy('person_id','DayOfWeek').orderBy('unix_ts').rangeBetween(-86400*7,-1)
df = df.withColumn('count',F.count('unix_ts').over(w))
df.sort(df.unix_ts).show(20,False)
#+---------+----------+--------------------------+----------+---------+-----+
#|person_id|session_id|timestamp                 |unix_ts   |DayOfWeek|count|
#+---------+----------+--------------------------+----------+---------+-----+
#|a        |u8u       |2018-02-02 05:46:41.438357|1517568839|Friday   |0    |
#|a        |u8u       |2018-02-02 05:46:41.439377|1517568840|Friday   |1    |
#|a        |g8g       |2018-02-02 09:48:56.006206|1517582942|Friday   |2    |
#|a        |g8g       |2018-02-02 09:48:56.007974|1517582943|Friday   |3    |
#|a        |a3a       |2018-02-02 09:48:34.081818|1517582995|Friday   |4    |
#|a        |a3a       |2018-02-02 09:48:34.095586|1517583009|Friday   |5    |
#|a        |9k9       |2018-02-02 12:50:48.862644|1517594710|Friday   |6    |
#|a        |9k9       |2018-02-02 12:50:49.981848|1517594830|Friday   |7    |
#+---------+----------+--------------------------+----------+---------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...