Как получить 100+ слов любого размера в одинаковом предложении spark sql? - PullRequest
0 голосов
/ 03 апреля 2020

Мне нужно подавить набор ключевых слов из API в моем наборе данных по всем столбцам. В настоящее время у меня есть предложение like, подобное следующему.

SPARK версия: 2.1.0

where  lower(c) not like '%gun%'  and  lower(c) not like '%torture%'  and  lower(c) not like '%tough-mudder%'  and  lower(c) not like '%health & beauty - first aid%'  and  lower(c) not like '%hippie-butler%'

Я получаю следующую ошибку

"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage0" grows beyond 64 KB

Чтобы смягчить это, я пытаюсь разбить задачу на подзадачу, получив результат для каждых 15 ключевых слов, затем применив следующие 15 ключевых слов к полученному ранее результату. Поэтому для каждых 15 я вызываю метод для применения текущего набора из 15, и полученный результат передается в качестве входных данных одной и той же функции до тех пор, пока не завершатся все слова.

dataSet = getTransformedDataset(dataSet, word);

Мой запрос выглядит следующим образом:

select uuid , c  from cds where  lower(c) not like '%gun%'  and  lower(c) not like '%kill%'  and  lower(c) not like '%murder%' ..

Теперь он работает нормально для меньшего набора данных. Но для большего набора данных он занимает больше памяти, чем разрешено и которое мы настроили.

 Job aborted due to stage failure: Task 5 in stage 13.0 failed 4 times, most recent failure: Lost task 5.3 in stage 13.0 (TID 2093, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 50.0 GB of 49 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Добавлен код. Буду очень признателен за любую помощь.

private Dataset<Row> getTransformedRowDataset(final Dataset<Row> input,Dataset<Row> concatenatedColumnsDS, final List<String> regexToSuppress, final List<String> wordsToSuppress) {
    final String regexString = regexToSuppress.stream().collect(Collectors.joining("|", "(", ")"));
    final String likeString = wordsToSuppress
            .stream()
            .map(s -> " lower(c) not like '%" + s.toLowerCase() + "%' ")
            .collect(Collectors.joining(" and "));
    if(!likeString.isEmpty()) {
        concatenatedColumnsDS = concatenatedColumnsDS.where(likeString);
    }
    final Dataset<Row> joinedDs = input.join(concatenatedColumnsDS, "uuid");
    return  "()".equals(regexString) || regexString.isEmpty() ? joinedDs.drop("c") :
            joinedDs.where(" c not rlike '" + regexString + "'").drop("c");
}

1 Ответ

0 голосов
/ 06 апреля 2020

Попробуйте фильтр

{    package spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col}

object filterWorld extends App {

  val spark = SparkSession.builder()
    .master("local")
    .appName("Mapper")
    .getOrCreate()

  import spark.implicits._

  case class Person(
                     ID: Int,
                     firstName: String,
                     lastName: String,
                     description: String,
                     comment: String
                   )


  val personDF = Seq(
    Person(1, "FN1", "LN1", "TEST", "scala"),
    Person(2, "FN2", "LN2", "develop", "spark"),
    Person(3, "FN3", "LN3", "test", "sql"),
    Person(4, "FN4", "LN4", "develop", "java"),
    Person(5, "FN5", "LN5", "test", "c#"),
    Person(6, "FN6", "LN6", "architect", "python"),
    Person(7, "FN7", "LN7", "test", "spark"),
    Person(8, "FN8", "LN8", "architect", "scala"),
    Person(9, "FN9", "LN9", "qa", "hql"),
    Person(10, "FN10", "LN10", "manager", "haskell")
  ).toDF()

  personDF.show(false)
//  +---+---------+--------+-----------+-------+
//  |ID |firstName|lastName|description|comment|
//  +---+---------+--------+-----------+-------+
//  |1  |FN1      |LN1     |TEST       |scala  |
//  |2  |FN2      |LN2     |develop    |spark  |
//  |3  |FN3      |LN3     |test       |sql    |
//  |4  |FN4      |LN4     |develop    |java   |
//  |5  |FN5      |LN5     |test       |c#     |
//  |6  |FN6      |LN6     |architect  |python |
//  |7  |FN7      |LN7     |test       |spark  |
//  |8  |FN8      |LN8     |architect  |scala  |
//  |9  |FN9      |LN9     |qa         |hql    |
//  |10 |FN10     |LN10    |manager    |haskell|
//  +---+---------+--------+-----------+-------+
//
  val fltr = !col("description").like("%e%") && !col("comment").like("%s%")

  val res = personDF.filter(fltr)
  res.show(false)
//  +---+---------+--------+-----------+-------+
//  |ID |firstName|lastName|description|comment|
//  +---+---------+--------+-----------+-------+
//  |9  |FN9      |LN9     |qa         |hql    |
//  +---+---------+--------+-----------+-------+
}

}

...