Каков наилучший подход для сбора результатов из набора данных Spark? - PullRequest
0 голосов
/ 17 мая 2018

У меня есть набор данных из 2 полей / столбцов, col1 и col2, и оба типа Array[String].Я собираю отдельные элементы из col1 в отсортированном порядке для каждого элемента _dArr и сохраняю только первый элемент в списке с именем _list.Проблема в val arr = _ds.take(1)(0), что занимает много времени.Есть ли лучший подход, который я могу использовать, чтобы сделать код более эффективным.Я также взял пустой набор данных и сделал объединение, чтобы добавить новые строки, которые я получал как _ds из каждого цикла for, чтобы позже я мог сделать collect для этого результирующего набора данных, чтобы получить результат как List[Array[String]].Но это также заняло много времени.

case class ColFields(fields: Array[String])
var _dArr = null: Array[String]
var _list = List[ClassFields]()

    _dArr.foreach(x => {
      val _ds = df.select("col1")
        .where(array_contains(df("col2"), x))
        .withColumn("col1", explode(col("col1")))
        .agg(sort_array(collect_set("col1")).alias("col1")).as[Array[String]]
        .cache()

      val arr = _ds.take(1)(0)
      // val arr = _ds.collect()(0) // Does not make any performance difference.
      distPredDS.unpersist()

      _list ::= ColFields(arr)
    })

Пример ввода:

_dArr = Array[x, y, w]

col1         col2
------------------
[a, b]      [w, x]
[a, d]      [x, y]
[c, d, f]   [y]
[h]         [w]

Вывод:

List[Array[a, b, d], Array[a, c, d, f], Array[a, b, h]]

или

List[List[a, b, d], List[a, c, d, f], List[a, b, h]]

1 Ответ

0 голосов
/ 18 мая 2018

Исходя из того, что я понял, вы пытаетесь найти, содержит ли col2 какой-либо элемент из dArray, если да, то получить первый элемент, отсортированный из col1, в список.

    val dArr = Array("x","y","z")
    val df = Seq((Array("s","x","w"),Array("a","x")),(Array("v","d","e"),Array("b","z")),(Array("a","b","c"),Array("f","d"))).toDF("col1","col2")
   def checkValues(arr: Array[String]) = udf((col1: Seq[String], col2: Seq[String]) => { if (col2.exists(arr.contains)) col1.toList.sortWith(_ < _).head else ""})
   df.withColumn("col1_first", checkValues(dArr)($"col1",$"col2")).select($"col1_first").collectAsList

Здесь я передаю col1 & col2 в UDF, где он сравнивает col2 с arr и получает первый элемент col1 после сортировки. Затем вы можете отфильтровать непустое значение и collectAsList.

Дайте мне знать, если это работает, в противном случае предоставьте больше информации, включая ожидаемые образцы данных.

Я изменил, но взорвать кажется дорогостоящей операцией. Посмотрите, поможет ли это с вашим набором данных,

val dArr = Array("x","y","w")
val df = Seq((Array("a","b"),Array("w","x")),(Array("a","d"),Array("x","y")),(Array("c","d","f"),Array("x","y")),(Array("h"),Array("w"))).toDF("col1","col2")
def checkValues(arr: Array[String]) = udf((col1: Seq[String], col2: Seq[String]) => { arr.foldLeft(Array[String]()){case(acc, elem) => {if (col2.contains(elem)) acc :+ elem else acc}}})
val df_1 = df.withColumn("arrValues", explode(checkValues(dArr)($"col1",$"col2")))
df_1.select($"arrValues",explode($"col1").as("col1")).groupBy($"arrValues").agg(collect_set($"col1").as("col1_list")).show
...