В основном я создал 2 массива col4 и col5 , а затем использовал map_from_arrays для создания карты, а затем создал столбец из этих col1, col2, col3, используя карту и затем использовал когда, иначе ( когда isNotNull ) изменяет ваши столбцы на месте.
( spark2.4 + )
Данные
df.show()
+----+----+----+---------+----+
|col1|col2|col3| col4|col5|
+----+----+----+---------+----+
| A| B| C|col2,col3| X,Y|
| P| Q| R| col1| Z|
| I| J| K|col1,col3| S,T|
+----+----+----+---------+----+
% scala
import org.apache.spark.sql.functions.{col, map_from_arrays, split, when}
df.withColumn("col6", map_from_arrays(split($"col4",","),split($"col5",","))).drop("col4","col5")
.select($"col1",$"col2",$"col3",col("col6.col1").alias("col1_"),col("col6.col2").alias("col2_"),col("col6.col3").alias("col3_"))
.withColumn("col1", when(col("col1_").isNotNull, col("col1_")).otherwise($"col1"))
.withColumn("col2", when(col("col2_").isNotNull,col("col2_")).otherwise($"col2"))
.withColumn("col3",when(col("col3_").isNotNull,col("col3_")).otherwise($"col3"))
.drop("col1_","col2_","col3_")
.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| A| X| Y|
| Z| Q| R|
| S| J| T|
+----+----+----+
% python
from pyspark.sql import functions as F
df.withColumn("col6", F.map_from_arrays(F.split("col4",','),F.split("col5",','))).drop("col4","col5")\
.select("col1","col2","col3",F.col("col6.col1").alias("col1_"),F.col("col6.col2").alias("col2_"),F.col("col6.col3").alias("col3_"))\
.withColumn("col1", F.when(F.col("col1_").isNotNull(), F.col("col1_")).otherwise(F.col("col1")))\
.withColumn("col2", F.when(F.col("col2_").isNotNull(),F.col("col2_")).otherwise(F.col("col2")))\
.withColumn("col3",F.when(F.col("col3_").isNotNull(),F.col("col3_")).otherwise(F.col("col3")))\
.drop("col1_","col2_","col3_")\
.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| A| X| Y|
| Z| Q| R|
| S| J| T|
+----+----+----+
ОБНОВЛЕНИЕ: Это будет работать для spark 2.0 + ( без map_from_array ):
( Вы можете сделать scala udf и применить аналогичные логи c, надеюсь, это поможет)
% python
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
@udf("map<string,string>")
def as_dict(x):
return dict(zip(*x)) if x else None
df.withColumn("col6", F.array(F.split(("col4"),','),F.split(("col5"),','))).drop("col4","col5")\
.withColumn("col6", as_dict("col6")).select("col1","col2","col3",F.col("col6.col1").alias("col1_"),F.col("col6.col2").alias("col2_"),F.col("col6.col3").alias("col3_"))\
.withColumn("col1", F.when(F.col("col1_").isNotNull(), F.col("col1_")).otherwise(F.col("col1")))\
.withColumn("col2", F.when(F.col("col2_").isNotNull(),F.col("col2_")).otherwise(F.col("col2")))\
.withColumn("col3",F.when(F.col("col3_").isNotNull(),F.col("col3_")).otherwise(F.col("col3")))\
.drop("col1_","col2_","col3_")\
.show()