Группировка по оконной функции в (Py) Spark SQL - PullRequest
0 голосов
/ 29 февраля 2020

Чтобы понять эту проблему, представьте, что группа людей в момент времени t1 находится в определенном месте L. Вам интересно знать, где находились все эти люди t0.

У меня есть пример кода для bootstrap упражнения:

row = Row("user_id", "start", "location_id")
df = spark.sparkContext.parallelize([
    row(1, "2015-01-01 01:00:00", 100),
    row(2, "2015-01-01 01:00:00", 100),

    row(2, "2015-01-01 02:00:00", 100),
    row(1, "2015-01-01 02:00:00", 300),
    row(2, "2015-01-01 02:00:00", 300),

    row(1, "2015-01-01 03:00:00", 300),
    row(3, "2015-01-01 03:00:00", 300),
    row(2, "2015-01-01 03:00:00", 300),
]).toDF().withColumn("timebucket_start", col("start").cast("timestamp"))

Ожидаемый результат должен быть чем-то как это:

+--------------------+--------------+-------------------+-----+                 
|camefrom_location_id|in_location_id|          at_bucket|count|
+--------------------+--------------+-------------------+-----+
|                 100|           300|2015-01-01 02:00:00|    2|
|                 100|           100|2015-01-01 02:00:00|    1|
|                 100|           300|2015-01-01 03:00:00|    1|
|                 300|           300|2015-01-01 03:00:00|    2|
+--------------------+--------------+-------------------+-----+

Как вы можете видеть, во временном интервале 03:00:00 в местоположении 300 есть 2 человека, прибывающих из местоположения 300 (в исходных данных во время 02:00:00 оба 1 и 2 были в 300) плюс еще один, приходящий из местоположения 100 (только 2).

Я рассчитал это решение с помощью самостоятельного соединения:

df.createOrReplaceTempView("dataset")

res = spark.sql(f"""
SELECT
    j1.location_id as camefrom_location_id,
    j2.location_id as in_location_id,
    j2.timebucket_start as at_bucket,
    COUNT(DISTINCT j1.user_id) as count
FROM
    dataset j1 INNER JOIN dataset j2
ON
    j2.timebucket_start == (j1.timebucket_start + INTERVAL 1 HOUR) AND
    j1.user_id == j2.user_id
GROUP BY
    j1.location_id, j2.location_id, j2.timebucket_start
ORDER BY
    at_bucket
""")

Я хотел бы повторить то же самое, используя функцию Window, потому что она предположительно работает быстрее (на большем наборе данных самоподключение взрывается). И здесь я борюсь с созданием этого в Spark SQL.

По сути, моя идея заключалась в том, чтобы «сгруппировать по местоположению и сегменту» (чтобы вы получили список user_id с), а затем у вас окно на основании этого списка. Но я не совсем уверен, что вы можете открыть окно на основе списка идентификаторов. В то же время, если на этом этапе я анализирую результаты, тогда я игнорирую цель использования оконной функции ... не так ли?

Другая идея состояла в том, чтобы создать 1-часовое окно и раздел с помощью user_id. затем группа в пределах окна. Но опять же, это похоже на то, что, возможно, с помощью пользовательского UDAF может быть достигнуто?

1 Ответ

0 голосов
/ 29 февраля 2020

Ваши данные неверны для нужного вам результата, или результат неверен для данных, которые вы предоставили. В зависимости от того, как вы на это смотрите. Например, идентификатор 3 входит в первую запись в строке (3, «2015-01-01 03:00:00», 300), поэтому он не имеет местоположения «откуда». Таким образом, только два идентификатора в "2015-01-01 03:00:00", 300 "имеют исходное местоположение, но ожидаемый результат показывает 3, счет 2 для 300-300 и счет 1 100-300.

Я оставлю вас, чтобы переименовать столбцы, но это примерно то, что я бы сделал для достижения этой цели:

w = Window.partitionBy('user_id').orderBy('start'))
cols = 'camefrom_location_id','location_id','timebucket_start'
(df.withColumn(
  'from',
# in a new col get the previous entry/location for each id
  F.lag('location_id').over(w))
#drop nulls i.e first entry for every id
 .na.drop()
#now group and count the entries per 'from + current location and start 
#bucket'
#drop duplicate entries for the same id in 02:00:00
 .groupBy(cols +[id]).agg(F.last('location_id'))
#aggregate the final result
 .groupBy(cols).agg(F.count('start'))
).show()

Я бы предложил дважды проверить данные в вопросе должным образом, прежде чем задавать вопрос, это будет увеличьте вероятность получения хорошего ответа.

...