Как расширить массив данных массива struct и filter на основе длины уникальных элементов в каждой строке? - PullRequest
0 голосов
/ 07 мая 2019

У меня есть тип данных [value: array<struct<_1:string,_2:string>>] Я хочу добавить новый столбец к этому фрейму данных, который включает в себя длину всех уникальных элементов, извлекаемых путем расширения всех кортежей в каждой строке. Моя основная цель - отбросить строку, когда эта длина превышает определенное значение.

Пока у меня есть только длина каждой строки - код, показанный ниже

 val size = inputDF.rdd.map(_.getSeq[Row](0)).map(x => {
      val aSet = scala.collection.mutable.Set[String]()
      x.map {
        case Row(aa: String, bb: String) =>
          aSet += aa
          aSet += bb
      }
      (aSet.size)
    })

Однако, когда я пытаюсь добавить это как новый столбец к данным inputDF, это не работает.

Пример inputDF:

val inputDF = Seq(
        (Array(("A","B"))),
        (Array(("C","D"),("C","E"),("D","F"),("F","G"),("G","H"))),
        (Array(("C","D"))),
        (Array(("P","Q"),("R","S"),("T","U"),("T","V")))
     ).toDF  

И ожидаемый столбец, который будет добавлен к этому, имеет значения - 2,6,2,7

Ответы [ 2 ]

1 голос
/ 07 мая 2019

Если вы используете Spark версии 2.4.0 или выше, то вы можете сделать то же самое, не используя UDF (что должно быть более оптимизированным решением):

scala> inputDF.selectExpr("*", "size(array_distinct(flatten(transform(value, (v, i) -> array(v._1, v._2))))) as count").show(false)
+----------------------------------------+-----+
|value                                   |count|
+----------------------------------------+-----+
|[[A, B]]                                |2    |
|[[C, D], [C, E], [D, F], [F, G], [G, H]]|6    |
|[[C, D]]                                |2    |
|[[P, Q], [R, S], [T, U], [T, V]]        |7    |
+----------------------------------------+-----+

Подробнее о функциях высшего порядка Apache Spark: https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html

0 голосов
/ 07 мая 2019

Я бы предложил использовать UDF, который fold помещает элементы структуры в Set и возвращает size, как показано ниже:

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.Row

val df = Seq(
  Seq(("a", "b"), ("b", "c")),
  Seq(("d", "e"), ("f", "g"), ("g", "h")),
  Seq(("i", "j"))
).toDF("c1")

val distinctElemCount = udf{ (arr: Seq[Row]) =>
  arr.foldLeft(Set.empty[String])(
    (acc, r) => acc + r.getString(0) + r.getString(1)
  ).size
}

df.withColumn("count", distinctElemCount($"c1")).show(false)
// +------------------------+-----+
// |c1                      |count|
// +------------------------+-----+
// |[[a, b], [b, c]]        |3    |
// |[[d, e], [f, g], [g, h]]|5    |
// |[[i, j]]                |2    |
// +------------------------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...