Сопоставить отдельные значения в одном кадре данных со значениями в другом кадре данных - PullRequest
0 голосов
/ 29 апреля 2018

У меня есть фрейм данных (DF1) с двумя столбцами

+-------+------+
|words  |value |
+-------+------+
|ABC    |1.0   |
|XYZ    |2.0   |
|DEF    |3.0   |
|GHI    |4.0   |
+-------+------+

и другой фрейм данных (DF2), подобный этому

+-----------------------------+
|string                       |
+-----------------------------+
|ABC DEF GHI                  |
|XYZ ABC DEF                  |                
+-----------------------------+

Я должен заменить отдельные строковые значения в DF2 их соответствующими значениями в DF1 ... например, после операции мне нужно вернуть этот кадр данных.

+-----------------------------+
|stringToDouble               |
+-----------------------------+
|1.0 3.0 4.0                  |
|2.0 1.0 3.0                  |                
+-----------------------------+

Я пробовал несколько способов, но не могу найти решение.

 def createCorpus(conversationCorpus: Dataset[Row], dataDictionary: Dataset[Row]): Unit = {
 import spark.implicits._

 def getIndex(word: String): Double = {
 val idxRow = dataDictionary.selectExpr("index").where('words.like(word))
 val idx = idxRow.toString
 if (!idx.isEmpty) idx.trim.toDouble else 1.0
 }

 conversationCorpus.map { //eclipse doesnt like this map here.. throws an error..
    r =>
    def row = {
       val arr = r.getString(0).toLowerCase.split(" ")
       val arrList = ArrayBuffer[Double]()
       arr.map {
          str =>
          val index = getIndex(str)
       }
       Row.fromSeq(arrList.toSeq)
       }
       row

   }
 }

1 Ответ

0 голосов
/ 29 апреля 2018

Объединение нескольких фреймов данных для создания новых столбцов потребует объединения . И, глядя на два ваших кадра данных, кажется, мы можем объединить по words столбцу df1 и string столбцу df2, но столбцу string требуется explode и комбинация позже ( что может быть сделано путем предоставления уникальных идентификаторов каждой строке перед взрывом). monotically_increasing_id дает уникальные идентификаторы для каждой строки в df2. split функция превращает string столбец в массив для разнесения . Тогда вы можете join их. а затем остальные шаги объединяют обратно разнесенные строки обратно в исходные , выполняя groupBy и агрегацию .

Наконец, собранный столбец массива можно изменить на нужный строковый столбец с помощью функции udf

.

Короче говоря, следующее решение должно работать для вас

import org.apache.spark.sql.functions._
def arrayToString = udf((array: Seq[Double])=> array.mkString(" "))

df2.withColumn("rowId", monotonically_increasing_id())
  .withColumn("string", explode(split(col("string"), " ")))
  .join(df1, col("string") === col("words"))
  .groupBy("rowId")
  .agg(collect_list("value").as("stringToDouble"))
  .select(arrayToString(col("stringToDouble")).as("stringToDouble"))

что должно дать вам

+--------------+
|stringToDouble|
+--------------+
|1.0 3.0 4.0   |
|2.0 1.0 3.0   |
+--------------+
...