Столбец Spark DataFrame со списком других столбцов, разделенных запятыми, который необходимо обновить, указав значения в другом столбце. - PullRequest
2 голосов
/ 12 марта 2020

У меня есть вариант использования, который я пытаюсь решить в Spark DataFrames. Столбец «col4» - это разделенная запятыми строка, состоящая из имен других столбцов, которые должны быть обновлены с помощью строковых значений, указанных в столбце col5.

+----+----+----+---------+----+
|col1|col2|col3|     col4|col5|
+----+----+----+---------+----+
|   A|   B|   C|col2,col3| X,Y|
|   P|   Q|   R|     col1|   Z|
|   I|   J|   K|col1,col3| S,T|
+----+----+----+---------+----+

После преобразования - Результирующий DataFrame должен выглядеть следующим образом. Как мне этого добиться?

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   A|   X|   Y|
|   Z|   Q|   R|
|   S|   J|   T|
+----+----+----+

Ответы [ 3 ]

1 голос
/ 12 марта 2020

В основном я создал 2 массива col4 и col5 , а затем использовал map_from_arrays для создания карты, а затем создал столбец из этих col1, col2, col3, используя карту и затем использовал когда, иначе ( когда isNotNull ) изменяет ваши столбцы на месте.

( spark2.4 + )

Данные

df.show()

+----+----+----+---------+----+
|col1|col2|col3|     col4|col5|
+----+----+----+---------+----+
|   A|   B|   C|col2,col3| X,Y|
|   P|   Q|   R|     col1|   Z|
|   I|   J|   K|col1,col3| S,T|
+----+----+----+---------+----+

% scala

import org.apache.spark.sql.functions.{col, map_from_arrays, split, when}

df.withColumn("col6", map_from_arrays(split($"col4",","),split($"col5",","))).drop("col4","col5")
.select($"col1",$"col2",$"col3",col("col6.col1").alias("col1_"),col("col6.col2").alias("col2_"),col("col6.col3").alias("col3_"))
.withColumn("col1", when(col("col1_").isNotNull, col("col1_")).otherwise($"col1"))
.withColumn("col2", when(col("col2_").isNotNull,col("col2_")).otherwise($"col2"))
.withColumn("col3",when(col("col3_").isNotNull,col("col3_")).otherwise($"col3"))
.drop("col1_","col2_","col3_")
.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   A|   X|   Y|
|   Z|   Q|   R|
|   S|   J|   T|
+----+----+----+  

% python

from pyspark.sql import functions as F

df.withColumn("col6", F.map_from_arrays(F.split("col4",','),F.split("col5",','))).drop("col4","col5")\
.select("col1","col2","col3",F.col("col6.col1").alias("col1_"),F.col("col6.col2").alias("col2_"),F.col("col6.col3").alias("col3_"))\
.withColumn("col1", F.when(F.col("col1_").isNotNull(), F.col("col1_")).otherwise(F.col("col1")))\
.withColumn("col2", F.when(F.col("col2_").isNotNull(),F.col("col2_")).otherwise(F.col("col2")))\
.withColumn("col3",F.when(F.col("col3_").isNotNull(),F.col("col3_")).otherwise(F.col("col3")))\
.drop("col1_","col2_","col3_")\
.show()


+----+----+----+
|col1|col2|col3|
+----+----+----+
|   A|   X|   Y|
|   Z|   Q|   R|
|   S|   J|   T|
+----+----+----+

ОБНОВЛЕНИЕ: Это будет работать для spark 2.0 + ( без map_from_array ):

( Вы можете сделать scala udf и применить аналогичные логи c, надеюсь, это поможет)

% python

from pyspark.sql import functions as F
from pyspark.sql.functions import udf


@udf("map<string,string>")
def as_dict(x):
    return dict(zip(*x)) if x else None


df.withColumn("col6", F.array(F.split(("col4"),','),F.split(("col5"),','))).drop("col4","col5")\
.withColumn("col6", as_dict("col6")).select("col1","col2","col3",F.col("col6.col1").alias("col1_"),F.col("col6.col2").alias("col2_"),F.col("col6.col3").alias("col3_"))\
.withColumn("col1", F.when(F.col("col1_").isNotNull(), F.col("col1_")).otherwise(F.col("col1")))\
.withColumn("col2", F.when(F.col("col2_").isNotNull(),F.col("col2_")).otherwise(F.col("col2")))\
.withColumn("col3",F.when(F.col("col3_").isNotNull(),F.col("col3_")).otherwise(F.col("col3")))\
.drop("col1_","col2_","col3_")\
.show()
0 голосов
/ 13 марта 2020

Эта проблема может быть легко решена с помощью функции map RDD:

import org.apache.spark.sql.types.{StructType, StructField, StringType}

val targetColumns = df.columns.take(3) // we assume that the final df should contain 3 first elements. If not feel free to modify this accordingly to your requirements

val updatedRDD = df.rdd.map{ r => 
  val keys = r.getAs[String]("col4").split(",")
  val values = r.getAs[String]("col5").split(",")
  val mapping = keys.zip(values).toMap[String, String] // i.e: Map(col2 -> X, col3 -> Y)

  val updatedValues = targetColumns.map{c =>   
    if(keys.contains(c))
      mapping(c)
    else
      r.getAs[String](c)
  }

  Row(updatedValues:_*)
}

val schema = StructType(targetColumns.map{c => StructField(c, StringType, true)})
spark.createDataFrame(updatedRDD, schema).show(false)

// +----+----+----+
// |col1|col2|col3|
// +----+----+----+
// |A   |X   |Y   |
// |Z   |Q   |R   |
// |S   |J   |T   |
// +----+----+----+

Мы создаем карту, используя col4->keys, col5->values, которая используется для создания окончательного Row, который будет вернулся.

0 голосов
/ 13 марта 2020

Spark 2,4 +

Если столбцов не только 3, то он должен масштабироваться для большего количества столбцов. Я сделал этот код легко расширяемым.

val cols = Seq("col1", "col2", "col3")

val df1 = df.withColumn("id", monotonically_increasing_id)
val df2 = cols.foldLeft(
    df1.withColumn("col6", explode(arrays_zip(split($"col4", ","),split($"col5", ","))))
             .groupBy("id").pivot($"col6.0").agg(first($"col6.1"))
) {(df, c) => df.withColumnRenamed(c, c + "2")}

cols.foldLeft(df1.join(df2, "id")) {(df, c) => df.withColumn(c, coalesce(col(c + "2"), col(c)))}
  .select(cols.head, cols.tail: _*)
  .show

Результат:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   A|   X|   Y|
|   Z|   Q|   R|
|   S|   J|   T|
+----+----+----+
...