Spark dataframe - заменяет токены общей строки значениями столбцов для каждой строки, используя scala - PullRequest
0 голосов
/ 09 сентября 2018

У меня есть датафрейм с 3 столбцами - число (целое число), имя (строка), цвет (строка). Ниже приведен результат df.show с параметром перераспределения.

val df = sparkSession.read.format("csv").option("header", "true").option("inferschema", "true").option("delimiter", ",").option("decoding", "utf8").load(fileName).repartition(5).toDF()

+------+------+------+
|Number|  Name| Color|
+------+------+------+
|     4|Orange|Orange|
|     3| Apple| Green|
|     1| Apple|   Red|
|     2|Banana|Yellow|
|     5| Apple|   Red|
+------+------+------+

Моя цель - создать список строк, соответствующих каждой строке, заменив токены в общей динамической строке , которую я передаю в качестве параметра методу, значениями столбцов Например: commonDynamicString = Column.Name с Column.Color color

В этой строке мои токены: Column.Name и Column.Color . Мне нужно заменить эти значения для всех строк соответствующими значениями в этом столбце. Примечание: эта строка может изменяться динамически, поэтому жесткое кодирование не будет работать.

Я не хочу использовать СДР , если с данным фреймом нет другой опции.

Ниже приведены подходы, которые я попробовал, но не смог достичь своей цели.

Вариант 1:

val a = df.foreach(t => {
 finalValue = commonString.replace("Column.Number", t.getAs[Any]("Number").toString())
          .replace("DF.Name", t.getAs("Name"))
          .replace("DF.Color", t.getAs("Color"))

          println ("finalValue: " +finalValue)
          })

При таком подходе finalValue печатается как положено. Однако я не могу создать буфер списка или передать отсюда окончательную строку в виде списка другой функции, так как foreach возвращает Unit и ошибка зажигания.

Вариант 2: Я думаю об этом параметре, но мне понадобятся некоторые рекомендации, чтобы понять, можно ли использовать Foldleft или Window или любые другие функции искры для создания 4-го столбца с именем «Final». используя опцию withColumn и использовать UDF, где я могу извлечь все токены, используя сопоставление с регулярным выражением - «Столбец. \ w +», и заменить операцию для токенов?

+------+------+------+--------------------------+
|Number|  Name| Color|      Final               |
+------+------+------+--------------------------+
|     4|Orange|Orange|Orange with orange color  |
|     3| Apple| Green|Apple with Green color    |
|     1| Apple|   Red|Apple with Red color      |
|     2|Banana|Yellow|Banana with Yellow color  |
|     5| Apple|   Red|Apple with Red color      |
+------+------+------+--------------------------+

Может ли кто-нибудь помочь мне с этой проблемой, а также сообщить мне, если я думаю в правильном направлении, чтобы использовать искру для обработки больших наборов данных?

Спасибо!

1 Ответ

0 голосов
/ 09 сентября 2018

Если я правильно понимаю ваше требование, вы можете создать метод столбца, скажем, parseStatement, который принимает оператор String -типа и возвращает Column со следующими шагами:

  1. Разбор ввода statement для подсчета количества токенов
  2. Создание шаблона регулярного выражения в форме ^(.*?)(token1)(.*?)(token2) ... (.*?)$
  3. Применить сопоставление с шаблоном для сборки colList, состоящего из освещенного (g1), col (g2), освещенного (g3), col (g4), ..., где g? s - извлеченные группы регулярных выражений
  4. Конкатенация Элементы типа столбца

Вот пример кода:

import spark.implicits._
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

def parseStatement(stmt: String): Column = {
  val token = "Column."
  val tokenPattern = """Column\.(\w+)"""
  val literalPattern = "(.*?)"
  val colCount = stmt.sliding(token.length).count(_ == token)

  val pattern = (0 to colCount * 2).map{
    case i if (i % 2 == 0) => literalPattern
    case _ => tokenPattern
  }.mkString

  val colList = ("^" + pattern + "$").r.findAllIn(stmt).
    matchData.toList.flatMap(_.subgroups).
    zipWithIndex.map{
      case (g, i) if (i % 2 == 0) => lit(g)
      case (g, i) => col(g)
  }

  concat(colList: _*)
}

val df = Seq(
  (4, "Orange", "Orange"),
  (3, "Apple", "Green"),
  (1, "Apple", "Red"),
  (2, "Banana", "Yellow"),
  (5, "Apple", "Red")
).toDF("Number", "Name", "Color")

val statement = "Column.Name with Column.Color color"

df.withColumn("Final", parseStatement(statement)).
  show(false)
// +------+------+------+------------------------+
// |Number|Name  |Color |Final                   |
// +------+------+------+------------------------+
// |4     |Orange|Orange|Orange with Orange color|
// |3     |Apple |Green |Apple with Green color  |
// |1     |Apple |Red   |Apple with Red color    |
// |2     |Banana|Yellow|Banana with Yellow color|
// |5     |Apple |Red   |Apple with Red color    |
// +------+------+------+------------------------+

Обратите внимание, что concat принимает параметры типа столбца, поэтому для значений столбцов необходимо col() и lit() для литералов.

...