Расчеты столбца данных Pyspark - PullRequest
0 голосов
/ 04 февраля 2020

Я учусь использовать кадры данных Pyspark для некоторых проектов, и у меня возник вопрос по поводу создания набора результатов. Ниже приведена таблица, представляющая информацию по секундам о пользователях, вошедших в платформу SaaS. Это, очевидно, происходит от данных о входе и выходе пользователя из системы, которые я успешно могу использовать с помощью DF.

1-й столбец представляет значение секунд в часе, 2-й столбец представляет общее число пользователей, вошедших в систему, 3-й представляет нового пользователя, который вошел в систему в данную секунду, 4-й показывает пользователей, которые вышли из системы.

Например: за сотую секунду всего 1 пользователь вошел в систему и, следовательно, In = 1, Out = 0 За 105 секунд вошли 10 новых пользователей, и, таким образом, всего = 12 и In = 10 за 107 секунды , 11 существующих пользователей вышли из системы и, таким образом, Out = 11 и всего = 1

SecondsInHour total       In  Out
100           1           1   0
101           1           0   0
102           1           0   0
103           2           1   0
104           2           0   0
105           12          10  0
106           12          0   0
107           1           0   11

....

Моя попытка получить этот результат выглядит следующим образом:

df.groupBy('logged_seconds') \
  .agg({'logged_seconds':'count', 'login_offset':'count', 'logout_offset':'count'}) \
  .show()

что не правильно. Как мне получить вышеуказанные результаты? Спасибо

Обновлено для добавления кода:

from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, explode, udf
from pyspark.sql.types import ArrayType, IntegerType

def seconds_range(start_date,end_date):
    start_seconds = start_date.minute * 60 + start_date.second
    end_seconds = end_date.minute * 60 + end_date.second
    return list(range(start_seconds, end_seconds+1))

def to_seconds(date):
    return date.minute * 60 + date.second

spark = SparkSession.builder.appName('MyApp').master('local[2]').getOrCreate()

# register udf function with spark
seconds_range_udf = udf(seconds_range, ArrayType(IntegerType()))
to_seconds_udf = udf(to_seconds, IntegerType())

# create dataframe with sample data.
df1 = spark.createDataFrame([('user1', '2019-12-01 9:02:00', '2019-12-01 09:04:00'),\
    ('user2', '2019-12-01 9:02:30', '2019-12-01 09:04:00'),\
    ('user3', '2019-12-01 9:03:23', '2019-12-01 09:03:50')],\
    ['user', 'login_start_dt', 'login_end_dt'])

# assign data types to the columns
df1 = df1.select(df1.iqcckey,\
    to_timestamp(df1.login_start_dt , 'yyyy-MM-dd HH:mm:ss').alias('login_start_dt '),\
    to_timestamp(df1.login_end_dt, 'yyyy-MM-dd HH:mm:ss').alias('login_end_dt'))

# construct a new column that is an array of seconds logged in.
df2 = df1.\
    withColumn('login_offset', to_seconds_udf('login_start_dt ')).\
    withColumn('logout_offset', to_seconds_udf('login_end_dt')).\
    withColumn('arr_logged_seconds', seconds_range_udf('login_start_dt ', 'login_end_dt'))

# convert the 3rd column (array) into rows
df2 = df2.withColumn('logged_seconds', explode(df2.arr_logged_seconds))

# group, count data
df2.groupBy('user','logged_seconds').agg({'logged_seconds':'count', 'login_offset':'count', 'logout_offset':'count'}).show()

print('End program')

1 Ответ

0 голосов
/ 10 февраля 2020

Построение вашего фрейма данных:

Добавлены дополнительные данные для проверки.

data=  [[100,1],
        [101,1],  
        [102,1],
        [103,2],
        [104,2],
        [105,12],
        [106,12],
        [107,1],
        [108,2],
        [109,12],
        [110,2],
        [111,0],
        [112,22],
        [113,17],
        [114,20]]
columns= ['SecondsInHour','Total']
df= spark.createDataFrame(data,columns)
df.show()

+-------------+-----+
|SecondsInHour|Total|
+-------------+-----+
|          100|    1|
|          101|    1|
|          102|    1|
|          103|    2|
|          104|    2|
|          105|   12|
|          106|   12|
|          107|    1|
|          108|    2|
|          109|   12|
|          110|    2|
|          111|    0|
|          112|   22|
|          113|   17|
|          114|   20|
+-------------+-----+

Определение windows и очистка нуля для первого значения:

В основном вычитание из запаздывания и используя положительное значение для одного столбца и отрицательное для другого, а затем обрабатывая первое значение для каждого.

w2= Window().orderBy(df.SecondsInHour)
df.withColumn("lagdiff",(F.col("Total")- F.lag("Total").over(w2)))\
.withColumn("lagdiff2", F.col("Total")- F.lag("Total").over(w2))\
.withColumn("lagdiff3", F.when((F.col("lagdiff2")>0),F.lit(0)).otherwise(F.col("lagdiff2")*-1))\
.withColumn("lagdiff4", F.when((F.col("lagdiff") <0), F.lit(0)).otherwise(F.col("lagdiff")))\
.withColumn("In", F.when(F.col("lagdiff").isNull(), F.col("Total")).otherwise(F.col("lagdiff4")))\
.withColumn("Out",F.when(F.col("lagdiff3").isNull(), F.lit(0)).otherwise(F.col("lagdiff3")))\
.orderBy(df.SecondsInHour.asc())\
.drop("lagdiff","lagdiff2","lagdiff3","lagdiff4").show()

+-------------+-----+---+---+
|SecondsInHour|Total| In|Out|
+-------------+-----+---+---+
|          100|    1|  1|  0|
|          101|    1|  0|  0|
|          102|    1|  0|  0|
|          103|    2|  1|  0|
|          104|    2|  0|  0|
|          105|   12| 10|  0|
|          106|   12|  0|  0|
|          107|    1|  0| 11|
|          108|    2|  1|  0|
|          109|   12| 10|  0|
|          110|    2|  0| 10|
|          111|    0|  0|  2|
|          112|   22| 22|  0|
|          113|   17|  0|  5|
|          114|   20|  3|  0|
+-------------+-----+---+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...