Добавление динамических столбцов на основе предыдущих значений строк с помощью PySpark? - PullRequest
0 голосов
/ 19 мая 2019

В моем случае у меня есть датафрейм, который показывает «Дни» по горизонтали, а в столбцах - проданные единицы каждого часа. Тем не менее, я также хотел бы показать 26 часов. Первые два часа предыдущего дня следует использовать в качестве значений и добавить в качестве столбцов «24» и «25».

Вот как выглядит кадр:

| Day | 0| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| Yesterday| |2012-01-04| 3|null|null|null|null|null|null|null| 1|null| 3|null|null| 2| 4| 2| 4| 2| 2| 2| 4| 1| 1| 2|2012-01-03| |2012-01-05|null|null|null|null|null| 1| 1| 36| 136| 65| 1| 8| 2| 4|null| 3| 2| 11| 2| 6| 5| 2|null|null|2012-01-04| |2012-01-06|null| 1|null|null|null| 1| 6| 32| 118| 88| 6| 1| 2| 2| 2| 6| 4| 3| 5| 4| 1| 3| 1|null|2012-01-05| |2012-01-07| 1|null|null|null|null|null| 4| 39| 128| 65| 3| 3| 7| 1| 4| 1| 4| 3| 4| 6| 1| 3| 1| 2|2012-01-06|

Я уже пытался связать данные с предыдущим днем ​​через left-join, но Spark каждый раз выдает мне сообщение об ошибке:

AnalysisException: u'Detected implicit cartesian product for LEFT OUTER join between logical plans

Объединение, связывающее данные с предыдущим днем:

df = df.alias("a").join(df, df["Yesterday"] == df["Day"], how="left").select("a.*", df["Day"].alias("Day1"))

Обычное объединение, похоже, не решает этого. Как я могу легко добавить столбцы, которые принимают значения строк предыдущего дня? Так что-то вроде shift.

1 Ответ

0 голосов
/ 19 мая 2019

Создание фрейма данных, содержащего данные из вашего примера:

>>> from pyspark.sql.types import StructType, StructField, LongType, StringType
>>> from pyspark.sql.functions import col
>>> data = [('2012-01-04',3,None,None,None,None,None,None,None,1,None,3,None,None,2,4,2,4,2,2,2,4,1,1,2,'2012-01-03'),
...         ('2012-01-05',None,None,None,None,None,1,1,36,136,65,1,8,2,4,None,3,2,11,2,6,5,2,None,None,'2012-01-04'),
...         ('2012-01-06',None,1,None,None,None,1,6,32,118,88,6,1,2,2,2,6,4,3,5,4,1,3,1,None,'2012-01-05'),
...         ('2012-01-07',1,None,None,None,None,None,4,39,128,65,3,3,7,1,4,1,4,3,4,6,1,3,1,2,'2012-01-06')]
>>> columns = ['Day','0','1','2','3','4','5','6','7','8','9','10','11','12','13','14','15','16','17','18','19','20','21','22','23','Yesterday']
>>> schema = StructType([StructField(c, LongType() if c.isdigit() else StringType(), True) for c in columns])
>>> df = spark.createDataFrame(data, schema)
>>> df.show(5, False)
+----------+----+----+----+----+----+----+----+----+---+----+---+----+----+---+----+---+---+---+---+---+---+---+----+----+----------+
|Day       |0   |1   |2   |3   |4   |5   |6   |7   |8  |9   |10 |11  |12  |13 |14  |15 |16 |17 |18 |19 |20 |21 |22  |23  |Yesterday |
+----------+----+----+----+----+----+----+----+----+---+----+---+----+----+---+----+---+---+---+---+---+---+---+----+----+----------+
|2012-01-04|3   |null|null|null|null|null|null|null|1  |null|3  |null|null|2  |4   |2  |4  |2  |2  |2  |4  |1  |1   |2   |2012-01-03|
|2012-01-05|null|null|null|null|null|1   |1   |36  |136|65  |1  |8   |2   |4  |null|3  |2  |11 |2  |6  |5  |2  |null|null|2012-01-04|
|2012-01-06|null|1   |null|null|null|1   |6   |32  |118|88  |6  |1   |2   |2  |2   |6  |4  |3  |5  |4  |1  |3  |1   |null|2012-01-05|
|2012-01-07|1   |null|null|null|null|null|4   |39  |128|65  |3  |3   |7   |1  |4   |1  |4  |3  |4  |6  |1  |3  |1   |2   |2012-01-06|
+----------+----+----+----+----+----+----+----+----+---+----+---+----+----+---+----+---+---+---+---+---+---+---+----+----+----------+

СЛЕДУЕТ присоединиться к фрейму данных и добавить два первых часа вчерашнего дня в виде часов «24» и «25»:

>>> newdf = df.alias('l').join(df.alias('r'), col('l.Yesterday') == col('r.Day'), how='left').select(col('l.*'), col('r.0').alias('24'), col('r.1').alias('25'))
>>> newdf.show(5, False)
+----------+----+----+----+----+----+----+----+----+---+----+---+----+----+---+----+---+---+---+---+---+---+---+----+----+----------+----+----+
|Day       |0   |1   |2   |3   |4   |5   |6   |7   |8  |9   |10 |11  |12  |13 |14  |15 |16 |17 |18 |19 |20 |21 |22  |23  |Yesterday |24  |25  |
+----------+----+----+----+----+----+----+----+----+---+----+---+----+----+---+----+---+---+---+---+---+---+---+----+----+----------+----+----+
|2012-01-07|1   |null|null|null|null|null|4   |39  |128|65  |3  |3   |7   |1  |4   |1  |4  |3  |4  |6  |1  |3  |1   |2   |2012-01-06|null|1   |
|2012-01-04|3   |null|null|null|null|null|null|null|1  |null|3  |null|null|2  |4   |2  |4  |2  |2  |2  |4  |1  |1   |2   |2012-01-03|null|null|
|2012-01-06|null|1   |null|null|null|1   |6   |32  |118|88  |6  |1   |2   |2  |2   |6  |4  |3  |5  |4  |1  |3  |1   |null|2012-01-05|null|null|
|2012-01-05|null|null|null|null|null|1   |1   |36  |136|65  |1  |8   |2   |4  |null|3  |2  |11 |2  |6  |5  |2  |null|null|2012-01-04|3   |null|
+----------+----+----+----+----+----+----+----+----+---+----+---+----+----+---+----+---+---+---+---+---+---+---+----+----+----------+----+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...