Я учусь использовать кадры данных Pyspark для некоторых проектов, и у меня возник вопрос по поводу создания набора результатов. Ниже приведена таблица, представляющая информацию по секундам о пользователях, вошедших в платформу SaaS. Это, очевидно, происходит от данных о входе и выходе пользователя из системы, которые я успешно могу использовать с помощью DF.
1-й столбец представляет значение секунд в часе, 2-й столбец представляет общее число пользователей, вошедших в систему, 3-й представляет нового пользователя, который вошел в систему в данную секунду, 4-й показывает пользователей, которые вышли из системы.
Например: за сотую секунду всего 1 пользователь вошел в систему и, следовательно, In = 1, Out = 0 За 105 секунд вошли 10 новых пользователей, и, таким образом, всего = 12 и In = 10 за 107 секунды , 11 существующих пользователей вышли из системы и, таким образом, Out = 11 и всего = 1
SecondsInHour total In Out
100 1 1 0
101 1 0 0
102 1 0 0
103 2 1 0
104 2 0 0
105 12 10 0
106 12 0 0
107 1 0 11
....
Моя попытка получить этот результат выглядит следующим образом:
df.groupBy('logged_seconds') \
.agg({'logged_seconds':'count', 'login_offset':'count', 'logout_offset':'count'}) \
.show()
что не правильно. Как мне получить вышеуказанные результаты? Спасибо
Обновлено для добавления кода:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, explode, udf
from pyspark.sql.types import ArrayType, IntegerType
def seconds_range(start_date,end_date):
start_seconds = start_date.minute * 60 + start_date.second
end_seconds = end_date.minute * 60 + end_date.second
return list(range(start_seconds, end_seconds+1))
def to_seconds(date):
return date.minute * 60 + date.second
spark = SparkSession.builder.appName('MyApp').master('local[2]').getOrCreate()
# register udf function with spark
seconds_range_udf = udf(seconds_range, ArrayType(IntegerType()))
to_seconds_udf = udf(to_seconds, IntegerType())
# create dataframe with sample data.
df1 = spark.createDataFrame([('user1', '2019-12-01 9:02:00', '2019-12-01 09:04:00'),\
('user2', '2019-12-01 9:02:30', '2019-12-01 09:04:00'),\
('user3', '2019-12-01 9:03:23', '2019-12-01 09:03:50')],\
['user', 'login_start_dt', 'login_end_dt'])
# assign data types to the columns
df1 = df1.select(df1.iqcckey,\
to_timestamp(df1.login_start_dt , 'yyyy-MM-dd HH:mm:ss').alias('login_start_dt '),\
to_timestamp(df1.login_end_dt, 'yyyy-MM-dd HH:mm:ss').alias('login_end_dt'))
# construct a new column that is an array of seconds logged in.
df2 = df1.\
withColumn('login_offset', to_seconds_udf('login_start_dt ')).\
withColumn('logout_offset', to_seconds_udf('login_end_dt')).\
withColumn('arr_logged_seconds', seconds_range_udf('login_start_dt ', 'login_end_dt'))
# convert the 3rd column (array) into rows
df2 = df2.withColumn('logged_seconds', explode(df2.arr_logged_seconds))
# group, count data
df2.groupBy('user','logged_seconds').agg({'logged_seconds':'count', 'login_offset':'count', 'logout_offset':'count'}).show()
print('End program')