Фильтрация некоторых данных из фрейма данных Spark на основе записей CSV - PullRequest
0 голосов
/ 31 мая 2019

У меня есть файл CSV, содержащий несколько слов. Всего № слов в файле csv не будет превышать 50 тыс. записей.

У меня есть Spark Dataframe, созданный из файла JSON, имеющего столбец keywords. Что мне нужно сделать, это отфильтровать записи из фрейма данных, чье значение столбца keywords совпадает со значением, присутствующим в файле CSV. Здесь «Соответствует» означает, что слово в файле csv появляется в столбце данных.

В качестве примера, скажем, в csv файле есть слово "baby toys" и spark dataframe выглядит так

***Keywords***
new baby toys
baby toys for all
costly baby toys price
baby has toys

В приведенных выше первых 3 строках должны быть отфильтрованы все слова со словом baby toys в последовательности.

Для реализации я делаю что-то вроде этого.

1. Reading csv file and creating a dataframe.

2. Collecting all the words as an array of strings from dataframe created above as
val negativeKeywords = csvDF.distinct.map(x => x.getString(0)).collect()

3. Creating a UDF to match the words - one from negative list created above and other from Spark dataframe(created in step 4)
 val udfmatch= udf((x: String) => {
      val loop = new Breaks
      var check = false
      loop.breakable{
        for(s <- negativeKeywords){
          if(x.contains(s)){
            check = true
            loop.break
          }
        }
      }
      check
  })

4. Created spark dataframe from JSON file. 
5. Filter from the above JSON dataframe using UDF defined above. 
   sparkDf.filter(udfmatch(col("keyword_text")))

В приведенном выше примере я перебираю весь список csv слов (пока не найду его) для каждого ключевого слова, присутствующего в Spark dataframe, что, на мой взгляд, неправильно и отнимает много времени. Может кто-нибудь, пожалуйста, предложите лучший способ.

1 Ответ

0 голосов
/ 31 мая 2019

Давайте начнем с Seq ключевых слов, которые приходят из CSV; мы можем использовать сгиб влево и использовать, как показано ниже код:

val conds= seq(("new toys")).toList
val result = conds.foldLeft(yourJsonDf){(newdf,conds)=>
 newdf.filter(!col("yourJsonColumnToFitlerOn").contains(conds))}

Это даст вам строки, которые не соответствуют вашему списку. если вы хотите иметь только эти отфильтрованные строки. Вы можете просто присоединиться к вашему «yourJsonColumntoFilteron» обратно к исходному DF и использовать левое соединение что-то вроде ниже кода

val filteredResults = originalDf.join(result ,Seq("yourJsonColumnToFilterOn"),"leftanti")
filteredResults.show()

По вашему мнению, возвращение нулевого DF связано с характером искровой обработки foldLeft. он не прерывает цикл итерации элементов в вашем списке. или Проще говоря, foldleft не выполняется N раз на основе элементов в вашем Списке. выполнение занимает всего 1 раз, что экономит вам много времени и ресурсов, и вам не нужно беспокоиться об очистке GC. надеюсь, что указанные выше изменения в ответе будут соответствовать вашему запросу. Cheers,

...