пересечение pyspark из двух столбцов датафреймов, находящихся в типе списка - PullRequest
0 голосов
/ 28 августа 2018

У меня есть фреймы данных со столбцами местоположений, и каждая ячейка содержит список названия страны, я хочу найти общее название страны из обоих столбцов и добавить его в выходной фрейм данных ..... кодирование в pyspark .....

df_input = spark.createDataFrame([
(100001,12301, 'India', ['India', 'USA','Germany']), (100002, 12302, 
'Germany', ['India', 'UK','Germany']), 
(100003,12303,'Taiwan',['India','Japan','China'])], ("pos_id","emp_id", 
"e_location", "p_location"))

Ввод данных:

+------+------+----------+--------------------+
|pos_id|emp_id|e_location|          p_location|
+------+------+----------+--------------------+
|100001| 12301|     India|[India, USA, Germ...|
|100002| 12302|   Germany|[India, UK, Germany]|
|100003| 12303|    Taiwan|[India, Japan, Ch...|
+------+------+----------+--------------------+

Теперь я хочу пересечение между двумя, показанными в Output DF.

Выходной кадр данных

+------+---------+----------------+
|emp_id|   pos_id| matched_country|
+------+---------+------+---------+
| 12301|   100001|           India|
| 12302|   100002|         Germany|
| 12303|   100003|            None|
+------+---------+----------------+

1 Ответ

0 голосов
/ 30 августа 2018

Я предполагаю, что ваш df_ploc кадр данных содержит список стран в столбце p_location. Тогда вы могли бы использовать что-то подобное для создания пересечения и сохранения всех комбинаций pos_id и emp_id.

Я изменил исходный фрагмент кода из-за пропущенных скобок и предположения о списке (в противном случае вы должны включить метод split.

df_ploc = spark.createDataFrame([
(000001, ['India', 'USA','Germany']), (000002, ['India', 'UK','Germany']), 
(000003,['India','Japan','China'])
], ("pos_id", "p_location"))

df_eloc = spark.createDataFrame([
(12301, 'India'), (12302, 'Germany'), (12303,'Taiwan')
], ("emp_id", "e_location"))

#create a new dataframe with one line per country
df_new = df_ploc.select("pos_id",explode("p_location").alias("new_location"))
df_eloc.join(df_new, df_new["new_location"] == df_eloc["e_location"], how="inner").show()

Вывод выглядит так:

+------+----------+------+------------+
|emp_id|e_location|pos_id|new_location|
+------+----------+------+------------+
| 12302|   Germany|     1|     Germany|
| 12302|   Germany|     2|     Germany|
| 12301|     India|     3|       India|
| 12301|     India|     1|       India|
| 12301|     India|     2|       India|
+------+----------+------+------------+

Модифицированный Join может выглядеть как

df_eloc.join(df_new, df_new["new_location"] == df_eloc["e_location"], how="left").groupBy("emp_id","new_location").agg(min("pos_id")).show()

Что выводит что-то вроде:

+------+------------+-----------+
|emp_id|new_location|min(pos_id)|
+------+------------+-----------+
| 12301|       India|          1|
| 12302|     Germany|          1|
| 12303|        null|       null|
+------+------------+-----------+

Если ваш pos_id должен быть просто перечислением (например, 1,2,3,4,5 ....), вы можете использовать некоторые функции, например row_number, для создания этого столбца. E.g.:

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window as W

df1 = df_eloc.join(df_new, df_new["new_location"] == df_eloc["e_location"], how="left").groupBy("emp_id","new_location").agg(min("pos_id"))
df1 = df1.withColumn("idx", monotonically_increasing_id())
windowSpec = W.orderBy("idx")
df1.withColumn("pos_id", row_number().over(windowSpec)).select("emp_id","pos_id","new_location").show()

Выход:

+------+------+------------+
|emp_id|pos_id|new_location|
+------+------+------------+
| 12301|     1|       India|
| 12302|     2|     Germany|
| 12303|     3|        null|
+------+------+------------+
...