Транспонировать строки в столбцы - PullRequest
0 голосов
/ 15 октября 2019

У меня есть требование переместить строки в столбцы. Есть 2 таблицы (показано ниже). Каждая запись в таблице product соответствует записям 0 или 1 или 2 в таблице product_segment. Там может быть 2 типа продуктов - HOS & AMB. Требуется заполнить значения «сегмента» в соответствующие им 2 столбца (1 для HOS и 1 для AM) в целевом объекте на основе этого типа продукта.

Заполните значение для этого сегмента HOS_segment или AMB_segment в целевом объекте на основе того, какая соответствующая запись product-type существует в источнике. Если присутствуют оба типа записей, заполните оба поля в выходных данных. Или заполняйте существующее.

Допустим, что таблицы:

Продукт:

product_id | eff_date
12345 | 10/01/2018
75852 | 22/05/2018
33995 | 15/02/2019

product_segment:

product_id | segment | type
12345 | KA | HOS
12345 | HM | AMB
75852 | GB | HOS
33995 | HD | AMB

Ожидаемый результат:

product_id | eff_date | HOS_segment | AMB_segment
12345 | 10/01/2018 | KA | HM
75852 | 22/05/2018 | GB | Null
33995 | 15/02/2019 | Null | HD

Для product 12345 существуют записи как HOS, так и AMB, поэтому в выходных данных оба столбца заполняются соответствующими сегментами.
Для product 75852 существует только запись HOS, следовательно, HOS_segment заполняется, но AMB_segment получает Null
И, наконец, для product 33995 происходит прямо противоположное. Заполняется AMB_segment, но HOS_segment получает значение Null

Может кто-нибудь помочь мне решить эту проблему

Ответы [ 2 ]

1 голос
/ 15 октября 2019

вместо использования соединений и где я бы предложил одиночное соединение с опорой. вот фрагмент кода посмотреть.

>>> import pyspark.sql.functions as F
>>> df1= spark.createDataFrame([[12345,"10/01/2018"],[75852,"10/01/2018"],[33995,"10/01/2018"]],["product_id","eff_date"])
>>> df1.show()
+----------+----------+                                                         
|product_id|  eff_date|
+----------+----------+
|     12345|10/01/2018|
|     75852|10/01/2018|
|     33995|10/01/2018|
+----------+----------+

>>> df2 = spark.createDataFrame([[12345,"KA","HOS"],[12345,"HM","AMB"],[75852,"GB","HOS"],[33995,"HD","AMB"]],["product_id","Segment","type"])
>>> df2.show()

+----------+-------+----+
|product_id|Segment|type|
+----------+-------+----+
|     12345|     KA| HOS|
|     12345|     HM| AMB|
|     75852|     GB| HOS|
|     33995|     HD| AMB|
+----------+-------+----+

>>> df1.join(df2,df1.product_id ==df2.product_id,"inner").groupBy(df2.product_id,df1.eff_date).pivot("type").agg(F.first(df2.Segment)).show()
+----------+----------+----+----+
|product_id|  eff_date| AMB| HOS|
+----------+----------+----+----+
|     12345|10/01/2018|  HM|  KA|
|     33995|10/01/2018|  HD|null|
|     75852|10/01/2018|null|  GB|
+----------+----------+----+----+

Spark-SQL 2.4 +

>>> df1.registerTempTable("df1_temp")
>>> df2.registerTempTable("df2_temp")
>>> spark.sql("select * from(select a.*,b.segment,b.type from df1_temp a inner join df2_temp b on a.product_id =b.product_id) PIVOT( first(segment) for type in ('HOS' HOS_segment,'AMB' AMB_Segment )) " ).show()
+----------+----------+-----------+-----------+
|product_id|  eff_date|HOS_segment|AMB_Segment|
+----------+----------+-----------+-----------+
|     12345|10/01/2018|         KA|         HM|
|     33995|10/01/2018|       null|         HD|
|     75852|10/01/2018|         GB|       null|
+----------+----------+-----------+-----------+

Я надеюсь, что этопоможет вам. дайте мне знать, если у вас есть какие-либо вопросы, связанные с тем же.

0 голосов
/ 15 октября 2019

Вы можете использовать объединение с отфильтрованной таблицей сегментов.

import pyspark.sql.functions as F

product \
.join(product_segment.where("type = 'HOS'").select("product_id", F.col("segment").alias("HOS_segment")), "product_id", "left_outer") \
.join(product_segment.where("type = 'AMB'").select("product_id", F.col("segment").alias("AMB_segment")), "product_id", "left_outer")
...