Предполагая, что мы можем использовать id для объединения этих двух наборов данных, я не думаю, что есть необходимость в UDF. Эту проблему можно решить, просто используя функции внутреннего объединения, функции array и array_remove .
Сначала давайте создадим два набора данных:
df1 = spark.createDataFrame([
[1, "ABC", 5000, "US"],
[2, "DEF", 4000, "UK"],
[3, "GHI", 3000, "JPN"],
[4, "JKL", 4500, "CHN"]
], ["id", "name", "sal", "Address"])
df2 = spark.createDataFrame([
[1, "ABC", 5000, "US"],
[2, "DEF", 4000, "CAN"],
[3, "GHI", 3500, "JPN"],
[4, "JKL_M", 4800, "CHN"]
], ["id", "name", "sal", "Address"])
Сначала мы выполняем внутреннее соединение между двумя наборами данных, затем генерируем условие df1[col] != df2[col]
для каждого столбца, кроме id
. Когда столбцы не равны, мы возвращаем имя столбца, в противном случае - пустую строку. Список условий будет состоять из элементов массива, из которого мы окончательно удаляем пустые элементы:
from pyspark.sql.functions import col, array, when, array_remove
# get conditions for all columns except id
conditions_ = [when(df1[c]!=df2[c], c).otherwise("") for c in df1.columns if c != 'id']
select_expr =[
col("id"),
*[df2[c] for c in df2.columns if c != 'id'],
array_remove(array(*conditions_), "").alias("column_names")
]
df1.join(df2, "id").select(*select_expr).show()
# +---+-----+----+-------+------------+
# | id| name| sal|Address|column_names|
# +---+-----+----+-------+------------+
# | 1| ABC|5000| US| []|
# | 3| GHI|3500| JPN| [sal]|
# | 2| DEF|4000| CAN| [Address]|
# | 4|JKL_M|4800| CHN| [name, sal]|
# +---+-----+----+-------+------------+