программно добавить один или несколько фильтров условий в scala spark - PullRequest
0 голосов
/ 20 марта 2020

Как мне получить все строки из необработанного CSV-файла путем фильтрации с несколькими условиями. У меня есть необработанный файл, и я изменяю его на DF.

val text =  sc.textFile("hdfs:///data/text/")
case class TextFile(id:String, time:String,text:String)


val textDf = text.map(_.split(",")).map(s => TextFile(s(0).toString(),
                                        s(1).toString(),
                                        s(2).toString()
                                        )).toDF()

И у меня также есть файл условий.

val findWord =  sc.textFile("hdfs:///condition/text.txt").collect.toList

Если мне было известно, каковы условия, мне просто нужно написать вниз, как это

textDf.filter(lower($"text").contains("ok") || lower($"text").contains("yes"))

Были различные условия, поэтому я пытаюсь, как это

val test = findWord.map(v => s"""lower($$"text").contains("$v")""").mkString(" || ");

textDf.filter(test).collect

, но я не могу запустить его. Кроме того, печать (тест) - это то же самое, что и условие, которое мне нужно, не могу использовать в фильтре df.

org.apache.spark.sql.catalyst.parser.ParseException:

Как мне решить мою проблему?

Спасибо за вашу помощь и совет.

Ответы [ 2 ]

1 голос
/ 20 марта 2020

Попытка построить условие String - не лучшая практика, я бы сказал. Вместо этого вы можете манипулировать классом Column . Как это:

val condition = words.map(v => col("text").contains(s"$v")).reduce(_||_)

, который производит следующий столбец:

condition: org.apache.spark.sql.Column = (((contains(text, yes) OR contains(text, ok)) OR contains(text, k)) OR contains(text, y))

На примере :

 val words = List("yes", "ok", "k", "y")
 val condition = words.map(v => col("text").contains(s"$v")).reduce(_||_)
 val df = Seq( ("word"), ("text"), ("ok"), ("abc"), ("y") ).toDF("text")
 df.filter(condition).show

Вывод :

+----+
|text|
+----+
|  ok|
|   y|
+----+
0 голосов
/ 20 марта 2020

Вы можете динамически построить условия фильтрации на основе файла findWords. Предположим, что findWords является List[String], вы можете сделать что-то вроде этого

val accFilter = lit("1") === "1" // a column that has a default true condition
val composedFilter = findWords
 .foldLeft(accFilter){case(accFilter, word) => {
   accFilter || lower($"text").contains(word)
}}

, и фильтрация будет построена на основе условия ||. Тогда вы просто делаете

textDf.filter(composedFilter)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...