Объединить дублирующиеся записи в одну запись в фрейме данных pyspark - PullRequest
0 голосов
/ 21 декабря 2018

У меня есть фрейм данных с дублирующимися строками, и я хотел бы объединить их в одну запись со всеми различными столбцами.

Мой пример кода выглядит следующим образом:

df1= sqlContext.createDataFrame([("81A01","TERR NAME 01","NJ","",""),("81A01","TERR NAME 01","","NY",""),("81A01","TERR NAME 01","","","LA"),("81A02","TERR NAME 01","CA","",""),("81A02","TERR NAME 01","","","NY")], ["zip_code","territory_name","state","state1","state2"])

результирующий кадр данных выглядит следующим образом:

df1.show()
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
|   81A01|  TERR NAME 01|   NJ|      |      |
|   81A01|  TERR NAME 01|     |    NY|      |
|   81A01|  TERR NAME 01|     |      |    LA|
|   81A02|  TERR NAME 01|   CA|      |      |
|   81A02|  TERR NAME 01|     |      |    NY|
+--------+--------------+-----+------+------+

Мне нужно объединить / объединить дубликаты записей на основе zip_code и получить все различные значения состояния в одной строке.

Ожидаемый результат:

+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
|   81A01|  TERR NAME 01|   NJ|    NY|    LA|
|   81A02|  TERR NAME 01|   CA|      |    LA|
+--------+--------------+-----+------+------+

Я новичок в pyspark и не знаю, как использовать группы / объединения.Может кто-нибудь, пожалуйста, помогите с кодом.

Ответы [ 2 ]

0 голосов
/ 21 декабря 2018

Примечание: Для любой уникальной записи zip_code и territory_name, если под любым из столбцов состояния есть несколько записей, тогда они будут concatenated.

Некоторое объяснение: В этом коде я использую RDDs.Сначала я делю каждую запись на две tuples, с tuple1 как key и tuple2 как value.Затем я уменьшаю на key.x соответствует tuple1 из (zip_code, territory_name), а tuple2 содержит 3 столбца состояния.tuple1 принимается за key, потому что мы хотим group by различные значения zip_code и territory_name.Итак, каждая отдельная пара, такая как (81A01,TERR NAME 01), (81A02,TERR NAME 01), является key, на основе которой мы reduce.Reduce означает, что нужно принимать каждые два значения за один раз и делать некоторые operation для него, а затем повторять то же самое operation с этим результатом и следующим элементом, пока весь кортеж не будет исчерпан.

Итак,уменьшение (1,2,3,4,5) с + operation будет - 1+2=3, затем 3+3=6 и выполнение + operation до достижения последнего элемента.Таким образом, 6+4=10 и, наконец, 10+5=15.Так как кортеж закончился на 5, результат равен 15. Вот как reduce работает с операцией +.Так как здесь у нас strings, а не numbers, поэтому конкатенация произойдет A+B=AB.

df1=df1.rdd.map(lambda r: ((r.zip_code, r.territory_name), (r.state, r.state1, r.state2)))\
       .reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]))\
       .map(lambda r: (r[0][0],r[0][1],r[1][0],r[1][1],r[1][2]))\
       .toDF(["zip_code","territory_name","state","state1","state2"])
df1.show()
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
|   81A01|  TERR NAME 01|   NJ|    NY|    LA|
|   81A02|  TERR NAME 01|   CA|      |    NY|
+--------+--------------+-----+------+------+
0 голосов
/ 21 декабря 2018

, если вы уверены, что для каждой комбинации территории zip_code есть только 1 штат, 1 штат1 и 1 штат2, вы можете использовать следующий код.Функция max использует строку, если в сгруппированных данных есть строка, поскольку непустая строка имеет более высокое значение (вероятно, ASCII), чем пустая строка ""

from pyspark.sql.types import *
from pyspark.sql.functions import *
df1= sqlContext.createDataFrame([("81A01","TERR NAME 01","NJ","",""),("81A01","TERR NAME 01","","NY",""),("81A01","TERR NAME 01","","","LA"),("81A02","TERR NAME 01","CA","",""),("81A02","TERR NAME 01","","","NY")], ["zip_code","territory_name","state","state1","state2"])

df1.groupBy("zip_code","territory_name").agg(max("state").alias("state"),max("state1").alias("state1"),max("state2").alias("state2")).show()

Результат:

+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
|   81A02|  TERR NAME 01|   CA|      |    NY|
|   81A01|  TERR NAME 01|   NJ|    NY|    LA|
+--------+--------------+-----+------+------+
...