PySpark получить значение предыдущей группы - PullRequest
0 голосов
/ 22 сентября 2018

Я пытаюсь получить предыдущее значение в той же группе, используя фрейм данных и PySpark, но я не могу заставить это работать, когда группа состоит из двух столбцов (дата и текст)

window = Window.partitionBy("date", "text").orderBy("date", "text")
df2 = df2.withColumn('prev_date', func.lag(df2['count']).over(window))

В результате:

+--------+----+-----+---+----------+
|    date|text|count|day|prev_count|
+--------+----+-----+---+----------+
|20180901| cat|    2|  1|      null|
|20180901| dog|    2|  1|      null|
|20180902| cat|    3|  2|      null|
|20180902| dog|    6|  2|      null|
|20180903| cat|    2|  3|      null|
|20180904| cat|    3|  4|      null|
|20180905| cat|    2|  5|      null|
|20180905| dog|    4|  5|      null|
+--------+----+-----+---+----------+

Желаемый результат:

+--------+----+-----+---+----------+
|    date|text|count|day|prev_count|
+--------+----+-----+---+----------+
|20180901| cat|    2|  1|      null|
|20180901| dog|    2|  1|      null|
|20180902| cat|    3|  2|         2|
|20180902| dog|    6|  2|         2|
|20180903| cat|    2|  3|         3|
|20180904| cat|    3|  4|         2|
|20180905| cat|    2|  5|         3|
|20180905| dog|    4|  5|         6|
+--------+----+-----+---+----------+

Цель состоит в том, чтобы сравнить text отсчет за один день до предыдущего дня.

Спасибо.

1 Ответ

0 голосов
/ 22 сентября 2018

Я думаю, что вы должны удалить поле "date" из оператора partitionBy.Данные уникальны с полями «дата» и «текст», так что это означает, что больше нет другой комбинации.Это причина, по которой все значения возвращают ноль

>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as func
>>> 
>>> data = sc.parallelize([
...     ('20180901','cat',2,1),
...     ('20180901','dog',2,1),
...     ('20180902','cat',3,2),
...     ('20180902','dog',6,2),
...     ('20180903','cat',2,3),
...     ('20180904','cat',3,4),
...     ('20180905','cat',2,5),
...     ('20180905','dog',4,5)])
>>> 
>>> columns = ['date','text','count','day']
>>> df = spark.createDataFrame(data, columns)
>>> 
>>> window = Window.partitionBy('text').orderBy('date','text')
>>> df = df.withColumn('prev_date', func.lag('count').over(window))
>>> 
>>> df.sort('date','text').show()
+--------+----+-----+---+---------+                                             
|    date|text|count|day|prev_date|
+--------+----+-----+---+---------+
|20180901| cat|    2|  1|     null|
|20180901| dog|    2|  1|     null|
|20180902| cat|    3|  2|        2|
|20180902| dog|    6|  2|        2|
|20180903| cat|    2|  3|        3|
|20180904| cat|    3|  4|        2|
|20180905| cat|    2|  5|        3|
|20180905| dog|    4|  5|        6|
+--------+----+-----+---+---------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...