Стоит ли выбирать RDD вместо DataSet / DataFrame, если я собираюсь выполнить много агрегаций по ключу? - PullRequest
0 голосов
/ 13 февраля 2019

У меня есть сценарий использования, в котором я собираюсь группировать по ключу (ключам) при агрегировании по столбцам.Я использую набор данных и пытался выполнить эти операции с помощью groupBy и agg.Например, возьмите следующий сценарий

case class Result(deptId:String,locations:Seq[String])
case class Department(deptId:String,location:String)

// using spark 2.0.2
// I have a Dataset `ds` of type Department   

+-------+--------------------+
|deptId |      location      |
+-------+--------------------+
|     d1|delhi               |            
|     d1|mumbai              |
|    dp2|calcutta            |
|    dp2|hyderabad           |       
+-------+--------------------+

Я намеревался преобразовать его в

// Dataset `result` of type Result

+-------+--------------------+
|deptId |      locations     |
+-------+--------------------+
|     d1|[delhi,mumbai]      |            
|    dp2|[calcutta,hyderabad]|            
+-------+--------------------+

Для этого я искал в стеке и нашел следующее:

val flatten = udf(
  (xs: Seq[Seq[String]]) => xs.flatten)

val result = ds.groupBy("deptId").
                agg(flatten(collect_list("location")).as("locations")

Вышесказанное показалось мне довольно аккуратным

  1. Но прежде чем искать вышеупомянутое, я сначала проверил, есть ли у Dataset встроенный reduceByKey, как у RDD.Но не смог найти, поэтому выбрал выше.Но я прочитал эту статью grouByKey против ReduByKey и узнал, что reduceByKey имеет меньше перемешиваний и более эффективен.Какая моя первая причина задать вопрос, должен ли я выбрать RDD в моем сценарии?
  2. Причиной, по которой я изначально выбрал Dataset, было исключительно принудительное использование типа, т.е.каждая строка имеет тип Department.Но как мой результат имеет совершенно другую схему, я должен беспокоиться о безопасности типов?Поэтому я попытался сделать result.as[Result], но, похоже, это не делает никакой проверки типа времени компиляции.Еще одна причина, по которой я выбрал Dataset, заключалась в том, что я передам результирующий набор данных какой-то другой функции, поскольку структура облегчает поддержку кода.Кроме того, класс case может быть сильно вложенным, я не могу себе представить поддержание этого вложения в pairRDD во время записи операций сокращения / отображения.
  3. Еще одна вещь, в которой я не уверен, - это использование udf.Я наткнулся на post , где люди сказали, что они предпочли бы изменить набор данных на RDD, а не использовать udf для сложных агрегаций / grouby.
  4. Я также немного погуглил и увидел посты / статьи, в которых люди говорили, что в Dataset накладные расходы при проверке типа, но в более высокой версии спарк лучшая производительность по сравнению с RDD.Опять не уверен, должен ли я вернуться к СДР?

PS: пожалуйста, прости, если я неправильно использовал некоторые термины.

1 Ответ

0 голосов
/ 13 февраля 2019

Чтобы ответить на некоторые из ваших вопросов:

  • groupBy + agg не groupByKey - DataFrame / набор данных groupBy поведение / оптимизация - в общем случае.Существуют конкретные случаи, когда он может вести себя как единое целое, включая collect_list.
  • reduceByKey не лучше, чем RDD в стиле groupByKey, когда требуется groupByKey -подобная логика- Будьте внимательны к groupByKey - и на самом деле это почти всегда хуже.

  • Существует важный компромисс между статической проверкой типов и производительностью в Spark's Dataset - Набор данных Spark 2.0 против DataFrame

  • Связанный пост специально рекомендует не использовать UserDefinedAggregateFunction (не UserDefinedFunction) из-за чрезмерногокопирование данных - UDAF Spark с ArrayType в качестве проблем с производительностью bufferSchema

  • Вам даже не нужно UserDefinedFunction, поскольку в вашем случае выравнивание не требуется:

    val df = Seq[Department]().toDF
    
    df.groupBy("deptId").agg(collect_list("location").as("locations"))
    

    И это то, что вы должны пойти на .

    Статически типизированный эквивалент будет

    val ds = Seq[Department]().toDS
    
    ds
      .groupByKey(_.deptId)
      .mapGroups { case (deptId, xs) => Result(deptId, xs.map(_.location).toSeq) }
    

    значительно дороже, чем DataFrame опция.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...