У меня есть датафрейм с логинами пользователей. Они входят в систему несколько раз в час.
Для каждого окна в течение 1 часа, я хочу, чтобы среднее значение между дельтой каждого логина
[Row(Day='2018-05-09', User='9999', login_hour='2018-05-09 09', login_timestamps=[1525859759, 1525859759, 1525859761, 1525859767, 1525859767, 1525859885]),
Row(Day='2018-05-08', User='9999', login_hour='2018-05-08 12', login_timestamps=[1525783439, 1525783439, 1525783439, 1525783439, 1525783452, 1525783452, 1525783453, 1525783453, 1525783458, 1525783506, 1525783506, 1525783508, 1525783508, 1525783510, 1525783510, 1525783512, 1525783512, 1525783513, 1525783513, 1525783514, 1525783514, 1525783515, 1525783515, 1525783516, 1525783516, 1525783844, 1525783844, 1525783845]),
- восстановить эти записи в login_timestamps и желательно из последних
1, т.е. в этом примере операция 1 будет последней записью,
1525859885- (минус) предыдущий или 1525859767, что дает нам 118
мс, и так с самого начала, так что у нас будет список
Разница между временными метками , и тогда мы можем вычислить среднее значение. Но
как это сделать в искре?
- и вторая проблема: каждый час
день, он должен быть сохранен в другом столбце , т.е. я должен создать
24 разных столбца, которые есть каждый час в этот день и добавляют к ним отличия от данного часа
чтобы получить это, я сделал:
login_list = df.withColumn('login_hour', F.date_format('Date','yyyy-MM-dd HH'))\
.groupBy('Day', 'User', 'login_hour')\
.agg(F.sort_array(F.collect_list('ActivityDay')).alias('login_timestamps'))
Вывод должен выглядеть примерно так:
Row(Day='2018-05-09', User='9999', login_hour='2018-05-09 09', login_timestamps=[1525859759, 1525859759, 1525859761, 1525859767, 1525859767, 1525859885]), login_at_0=[200, 245], login_at_1=[60], [...], login_at_9=[118, 6], etc.