Назначение столбцов другим столбцам в кадре данных Spark с использованием Scala - PullRequest
1 голос
/ 09 марта 2019

Я смотрел на этот превосходный вопрос, чтобы улучшить свои навыки в Scala и ответить на него: Извлечь значение столбца и назначить его другому столбцу в виде массива в фрейме данных spark

Я создал свой модифицированный код следующим образом, который работает, но у меня осталось несколько вопросов:

import spark.implicits._   
import org.apache.spark.sql.functions._

val df = sc.parallelize(Seq(
    ("r1", 1, 1),
    ("r2", 6, 4),
    ("r3", 4, 1),
    ("r4", 1, 2)
  )).toDF("ID", "a", "b")

val uniqueVal = df.select("b").distinct().map(x => x.getAs[Int](0)).collect.toList    
def myfun: Int => List[Int] = _ => uniqueVal 
def myfun_udf = udf(myfun)

df.withColumn("X", myfun_udf( col("b") )).show

+---+---+---+---------+
| ID|  a|  b|        X|
+---+---+---+---------+
| r1|  1|  1|[1, 4, 2]|
| r2|  6|  4|[1, 4, 2]|
| r3|  4|  1|[1, 4, 2]|
| r4|  1|  2|[1, 4, 2]|
+---+---+---+---------+

Работает, но:

  • Я отмечаю, что столбец b вставляется дважды.
  • Я также могу поставить в столбце a второе утверждение, и я получаю тот же результат. Например. и какой смысл это тогда?

df.withColumn ("X", myfun_udf (col ("a"))). Show

  • Если я введу идентификатор столбца, он будет нулевым.
  • Итак, мне интересно, почему вводится второй столбец?
  • И как это можно сделать для общей работы для всех столбцов?

Итак, это был код, который я посмотрел в другом месте, но я что-то упустил.

1 Ответ

1 голос
/ 09 марта 2019

Код, который вы показали, не имеет особого смысла:

  • Не масштабируется - в худшем случае размер каждой строки пропорционален размеру
  • Как вы уже поняли, аргумент вообще не нужен.
  • Это не нужно (и, что важно, не нужно) udf на момент написания (на 2016-12-23 Spark 1.6 и 2.0, где уже выпущено)
  • Если вы все еще хотите использовать udf, то будет достаточно нулевого варианта

В целом, это просто еще один запутанный и вводящий в заблуждение ответ, который послужил ОП в тот момент. Я бы проигнорировал (или проголосовал бы соответственно ) и пошел бы дальше.

Итак, как это можно сделать:

  • Если у вас есть локальный список, и вы действительно хотите использовать udf. Для одной последовательности используйте udf с функцией nullary:

    val uniqueBVal: Seq[Int] = ???
    val addUniqueBValCol = udf(() => uniqueBVal)
    
    df.withColumn("X", addUniqueBValCol())
    

    Обобщить до:

    import scala.reflect.runtime.universe.TypeTag
    
    def addLiteral[T : TypeTag](xs: Seq[T]) = udf(() => xs)
    
    val x = addLiteral[Int](uniqueBVal)
    df.withColumn("X", x())
    
  • Лучше не использовать udf:

    import org.apache.spark.sql.functions._
    
    df.withColumn("x", array(uniquBVal map lit: _*))
    
  • По состоянию на

    И как это можно сделать для всех столбцов?

    как уже упоминалось в начале, всю концепцию трудно защитить. Любые оконные функции (полностью не масштабируемые)

    import org.apache.spark.sql.expressions.Window
    
    val w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    df.select($"*" +: df.columns.map(c => collect_set(c).over(w).alias(s"${c}_unique")): _*)
    

    или перекрестное соединение с агрегатом (большую часть времени не масштабируемое)

    val uniqueValues = df.select(
      df.columns map (c => collect_set(col(c)).alias(s"${c}_unique")):_*
    )
    df.crossJoin(uniqueValues)
    

    В общем, хотя - вам придется переосмыслить свой подход, если это произойдет в реальных приложениях, если вы точно не знаете, что количество столбцов невелико и имеет строгие верхние границы.

Уберите сообщение: не верьте случайному коду, который случайные люди публикуют в Интернете. Этот включен.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...