Найти повторяющиеся значения столбца массива в кадре данных в Scala - PullRequest
0 голосов
/ 02 апреля 2020

У меня есть фрейм данных со столбцом массива, например:

  val df = Seq(
  Array("abc", "abc", "null", "null"),
  Array("bcd", "bc", "bcd", "null"),
  Array("ijk", "abc", "bcd", "ijk")).toDF("col")

И выглядит так:

col:
["abc","abc","null","null"]
["bcd","bc","bcd","null"]
["ijk","abc","bcd","ijk"]

Я пытаюсь получить дублирующее значение каждого массива в scala :

col_1:
['abc']
['bcd']
['ijk']

Я пытался получить дублирующее значение в списке, но не знал, как это можно сделать с помощью столбца массива

 val df = List("bcd", "bc", "bcd", "null")
 df.groupBy(identity).collect { case (x, List(_,_,_*)) => x }

Ответы [ 3 ]

2 голосов
/ 02 апреля 2020

Вы можете просто использовать пользовательский UDF

def findDuplicate = udf((in: Seq[String]) =>
  in.groupBy(identity)
    .filter(_._2.length > 1)
    .keys
    .toArray
)

df.withColumn("col_1", explode(findDuplicate($"col")))
  .show()

, если хотите пропустить значения null (как в вашем примере), просто добавьте .filterNot(_ == "null") перед .groupBy

2 голосов
/ 02 апреля 2020
df.withColumn("id", monotonically_increasing_id())
  .withColumn("col", explode(col("col")))
  .groupBy("id", "col")
  .count()
  .filter(col("count") > 1 /*&& col("col") =!= "null"*/)
  .select("col")
  .show()
1 голос
/ 02 апреля 2020

Повторяющиеся значения столбца массива можно получить, назначив монотонно увеличивающийся идентификатор каждому массиву, взорвав массив, а затем сгруппировав окно по идентификатору и столбцу.

import org.apache.spark.sql.functions.max
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.monotonically_increasing_id
import spark.implicits._
val df = spark.sparkContext.parallelize(Seq(
  Array("abc", "abc", null, null),
  Array("bcd", "bc", "bcd", null),
  Array("ijk", "abc", "bcd", "ijk"))).toDF("col")
df.show(10)

val idfDF = df.withColumn("id", monotonically_increasing_id)
val explodeDF = idfDF.select(col("id"), explode(col("col")))

val countDF = explodeDF.groupBy("id", "col").count()

// Windows are partitions of id
val byId = Window.partitionBy("id")
val maxDF = countDF.withColumn("max", max("count") over byId)

val finalDf = maxDF.where("max == count").where("col is not null").select("col")
finalDf.show(10)

+---+
|col|
+---+
|abc|
|ijk|
|bcd| 
+---+
...