Чтобы понять эту проблему, представьте, что группа людей в момент времени t1
находится в определенном месте L
. Вам интересно знать, где находились все эти люди t0
.
У меня есть пример кода для bootstrap упражнения:
row = Row("user_id", "start", "location_id")
df = spark.sparkContext.parallelize([
row(1, "2015-01-01 01:00:00", 100),
row(2, "2015-01-01 01:00:00", 100),
row(2, "2015-01-01 02:00:00", 100),
row(1, "2015-01-01 02:00:00", 300),
row(2, "2015-01-01 02:00:00", 300),
row(1, "2015-01-01 03:00:00", 300),
row(3, "2015-01-01 03:00:00", 300),
row(2, "2015-01-01 03:00:00", 300),
]).toDF().withColumn("timebucket_start", col("start").cast("timestamp"))
Ожидаемый результат должен быть чем-то как это:
+--------------------+--------------+-------------------+-----+
|camefrom_location_id|in_location_id| at_bucket|count|
+--------------------+--------------+-------------------+-----+
| 100| 300|2015-01-01 02:00:00| 2|
| 100| 100|2015-01-01 02:00:00| 1|
| 100| 300|2015-01-01 03:00:00| 1|
| 300| 300|2015-01-01 03:00:00| 2|
+--------------------+--------------+-------------------+-----+
Как вы можете видеть, во временном интервале 03:00:00
в местоположении 300
есть 2 человека, прибывающих из местоположения 300
(в исходных данных во время 02:00:00
оба 1
и 2
были в 300
) плюс еще один, приходящий из местоположения 100
(только 2
).
Я рассчитал это решение с помощью самостоятельного соединения:
df.createOrReplaceTempView("dataset")
res = spark.sql(f"""
SELECT
j1.location_id as camefrom_location_id,
j2.location_id as in_location_id,
j2.timebucket_start as at_bucket,
COUNT(DISTINCT j1.user_id) as count
FROM
dataset j1 INNER JOIN dataset j2
ON
j2.timebucket_start == (j1.timebucket_start + INTERVAL 1 HOUR) AND
j1.user_id == j2.user_id
GROUP BY
j1.location_id, j2.location_id, j2.timebucket_start
ORDER BY
at_bucket
""")
Я хотел бы повторить то же самое, используя функцию Window, потому что она предположительно работает быстрее (на большем наборе данных самоподключение взрывается). И здесь я борюсь с созданием этого в Spark SQL.
По сути, моя идея заключалась в том, чтобы «сгруппировать по местоположению и сегменту» (чтобы вы получили список user_id
с), а затем у вас окно на основании этого списка. Но я не совсем уверен, что вы можете открыть окно на основе списка идентификаторов. В то же время, если на этом этапе я анализирую результаты, тогда я игнорирую цель использования оконной функции ... не так ли?
Другая идея состояла в том, чтобы создать 1-часовое окно и раздел с помощью user_id. затем группа в пределах окна. Но опять же, это похоже на то, что, возможно, с помощью пользовательского UDAF может быть достигнуто?