Рассмотрим следующий фрейм данных pyspark:
df = sqlContext.createDataFrame(
[
('2019-05-08 11:00:00', 'a'),
('2019-05-08 11:02:12', 'b'),
('2019-05-08 11:04:24', 'a'),
('2019-05-08 11:06:36', 'c'),
('2019-05-08 11:08:48', 'c'),
('2019-05-08 11:11:00', 'a'),
('2019-05-08 11:13:12', 'v'),
('2019-05-08 11:23:34', 'd'),
('2019-05-08 11:26:24', 'e'),
('2019-05-08 11:28:36', 'c'),
('2019-05-08 11:30:48', 'b'),
('2019-05-08 11:35:12', 'b'),
('2019-05-08 11:37:24', 'b'),
('2019-05-08 11:44:00', 'a'),
('2019-05-08 11:48:24', 'x'),
('2019-05-08 11:50:36', 'k'),
('2019-05-08 11:55:00', 'b'),
('2019-05-08 12:01:36', 'c')
],
('datetime', 'value')
)
Что я пытаюсь (эффективно) сделать, так это найти скорость различного value
с течением времени для 30-минутных окон, открывающихся каждые 5 минут,Таким образом, в основном мне нужно найти скорость (countDistinct(value) / (datetime.max() - datetime.min())
) по временным окнам и дать в результате:
- 11: 00 - 11:30 - 6/1716 (a, b,e, d, c, v / (2019-05-08 11:28:36 - 2019-05-08 11:00:00 в секундах))
- 11: 05 - 11:35 - 6/ 1452 (a, b, e, d, c, v / (2019-05-08 11:30:48 - 2019-05-08 11:06:36 в секундах))
- 11: 10- 11: 40
- 11: 15 - 11: 45
и т. Д. *
Я попытался перейти к оконной функции, с которойУ меня был некоторый успех для отдельного счета (который не поддерживается, поэтому я пошел с F.size(F.collect_set('value').over(w))
), но я не мог сделать это для пользовательской функции.Я тоже попробовал UDF, но опять не повезло.