Я предполагаю, что ваш df_ploc
кадр данных содержит список стран в столбце p_location
. Тогда вы могли бы использовать что-то подобное для создания пересечения и сохранения всех комбинаций pos_id
и emp_id
.
Я изменил исходный фрагмент кода из-за пропущенных скобок и предположения о списке (в противном случае вы должны включить метод split
.
df_ploc = spark.createDataFrame([
(000001, ['India', 'USA','Germany']), (000002, ['India', 'UK','Germany']),
(000003,['India','Japan','China'])
], ("pos_id", "p_location"))
df_eloc = spark.createDataFrame([
(12301, 'India'), (12302, 'Germany'), (12303,'Taiwan')
], ("emp_id", "e_location"))
#create a new dataframe with one line per country
df_new = df_ploc.select("pos_id",explode("p_location").alias("new_location"))
df_eloc.join(df_new, df_new["new_location"] == df_eloc["e_location"], how="inner").show()
Вывод выглядит так:
+------+----------+------+------------+
|emp_id|e_location|pos_id|new_location|
+------+----------+------+------------+
| 12302| Germany| 1| Germany|
| 12302| Germany| 2| Germany|
| 12301| India| 3| India|
| 12301| India| 1| India|
| 12301| India| 2| India|
+------+----------+------+------------+
Модифицированный Join может выглядеть как
df_eloc.join(df_new, df_new["new_location"] == df_eloc["e_location"], how="left").groupBy("emp_id","new_location").agg(min("pos_id")).show()
Что выводит что-то вроде:
+------+------------+-----------+
|emp_id|new_location|min(pos_id)|
+------+------------+-----------+
| 12301| India| 1|
| 12302| Germany| 1|
| 12303| null| null|
+------+------------+-----------+
Если ваш pos_id должен быть просто перечислением (например, 1,2,3,4,5 ....), вы можете использовать некоторые функции, например row_number
, для создания этого столбца.
E.g.:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window as W
df1 = df_eloc.join(df_new, df_new["new_location"] == df_eloc["e_location"], how="left").groupBy("emp_id","new_location").agg(min("pos_id"))
df1 = df1.withColumn("idx", monotonically_increasing_id())
windowSpec = W.orderBy("idx")
df1.withColumn("pos_id", row_number().over(windowSpec)).select("emp_id","pos_id","new_location").show()
Выход:
+------+------+------------+
|emp_id|pos_id|new_location|
+------+------+------------+
| 12301| 1| India|
| 12302| 2| Germany|
| 12303| 3| null|
+------+------+------------+