Spark SQL Split или извлечение слов из строки слов - PullRequest
3 голосов
/ 02 июня 2019

У меня есть датафрейм с искрой, как показано ниже. Я пытаюсь разбить столбец на еще 2 столбца:

date   time    content

28may  11am    [ssid][customerid,shopid]
val personDF2 = personDF.withColumn("temp",split(col("content"),"\\[")).select(
  col("*") +: (0 until 3).map(i => col("temp").getItem(i).as(s/col$i)): _*)
date time   content                       col1   col2        col3

28may 11    [ssid][customerid,shopid]     ssid   customerid  shopid

Ответы [ 2 ]

1 голос
/ 25 июня 2019

Предполагая, что строка представляет массив слов.Получил ваш запрос.Вы можете оптимизировать количество фреймов данных, чтобы уменьшить нагрузку на систему.Если есть более 9 столбцов и т. Д., Вам может понадобиться использовать c00, c01 и т. Д. Для c10 и т. Д. Или просто использовать целое число в качестве имени для столбцов.оставьте это на ваше усмотрение.

import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

// Set up data
val df = spark.sparkContext.parallelize(Seq(
       ("A", "[foo][customerid,shopid][Donald,Trump,Esq][single]"),
       ("B", "[foo]")
     )).toDF("k", "v")

val df2 =  df.withColumn("words_temp",  regexp_replace($"v", lit("]"), lit("" )))
val df3 = df2.withColumn("words_temp2", regexp_replace($"words_temp", lit(","), lit("[" ))).drop("words_temp") 
val df4 = df3.withColumn("words_temp3", expr("substring(words_temp2, 2, length(words_temp2))")).withColumn("cnt", expr("length(words_temp2)")).drop("words_temp2") 
val df5 = df4.withColumn("words",split(col("words_temp3"),"\\[")).drop("words_temp3") 
val df6 = df5.withColumn("num_words", size($"words"))  
val df7 = df6.withColumn("v2", explode($"words"))

// Convert to Array of sorts via group by
val df8 = df7.groupBy("k")
            .agg(collect_list("v2"))
// Convert to rdd Tuple and then find position so as to gen col names! That is the clue so as to be able to use pivot
val rdd = df8.rdd
val rdd2 = rdd.map(row => (row.getAs[String](0), row.getAs[WrappedArray[String]](1).toArray))
val rdd3 = rdd2.map { case (k, list) => (k, list.zipWithIndex) }
val df9 = rdd3.toDF("k", "v")
val df10 = df9.withColumn("vn", explode($"v"))
val df11 = df10.select($"k", $"vn".getField("_1"), concat(lit("c"),$"vn".getField("_2"))).toDF("k", "v", "c")

// Final manipulation
val result = df11.groupBy("k")
                 .pivot("c")
                 .agg(expr("coalesce(first(v),null)")) // May never occur in your case, just done for completeness and variable length cols.
 result.show(100,false)

возвращается в этом случае:

+---+---+----------+------+------+-----+----+------+
|k  |c0 |c1        |c2    |c3    |c4   |c5  |c6    |
+---+---+----------+------+------+-----+----+------+
|B  |foo|null      |null  |null  |null |null|null  |
|A  |foo|customerid|shopid|Donald|Trump|Esq |single|
+---+---+----------+------+------+-----+----+------+
0 голосов
/ 05 июня 2019

Обновление: На основе оригинального названия с указанием массива слов. Смотрите другой ответ.

Если новый, то несколько вещей здесь. Может также быть сделано с набором данных и картой, которую я предполагаю. Вот решение с использованием DF и RDD. В будущем я вполне мог бы исследовать полный DS, но это работает точно и в масштабе.

// Can amalgamate more steps

import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

// Set up data
val df = spark.sparkContext.parallelize(Seq(
    ("A", Array(Array("foo", "bar"), Array("Donald", "Trump","Esq"), Array("single"))),
    ("B", Array(Array("foo2", "bar2"), Array("single2"))),
    ("C", Array(Array("foo3", "bar3", "x", "y", "z")))
     )).toDF("k", "v")
// flatten via 2x explode, can be done more elegeantly with def or UDF, but keeping it simple here
val df2 = df.withColumn("v2", explode($"v"))
val df3 = df2.withColumn("v3", explode($"v2"))
// Convert to Array of sorts via group by
val df4 = df3.groupBy("k")
            .agg(collect_list("v3"))
// Convert to rdd Tuple and then find position so as to gen col names! That is the clue so as to be able to use pivot
val rdd = df4.rdd
val rdd2 = rdd.map(row => (row.getAs[String](0), row.getAs[WrappedArray[String]](1).toArray))
val rdd3 = rdd2.map { case (k, list) => (k, list.zipWithIndex) }
val df5 = rdd3.toDF("k", "v")
val df6 = df5.withColumn("vn", explode($"v"))
val df7 = df6.select($"k", $"vn".getField("_1"), concat(lit("c"),$"vn".getField("_2"))).toDF("k", "v", "c")

// Final manipulation
val result = df7.groupBy("k")
               .pivot("c")
               .agg(expr("coalesce(first(v),null)")) // May never occur in your case, just done for completeness and variable length cols.
result.show(100,false)

возвращает в правильном порядке:

+---+----+----+-------+-----+----+------+
|k  |c0  |c1  |c2     |c3   |c4  |c5    |
+---+----+----+-------+-----+----+------+
|B  |foo2|bar2|single2|null |null|null  |
|C  |foo3|bar3|x      |y    |z   |null  |
|A  |foo |bar |Donald |Trump|Esq |single|
+---+----+----+-------+-----+----+------+
...