Есть ли флаг, позволяющий избежать перетасовки данных / кортежей между оператором источника и объединением группы - PullRequest
0 голосов
/ 22 мая 2019

Я использую GroupCombine и GroupReduce с примером WordCount.Но в отличие от исходного примера, я не могу объединить фазу GroupCombine с фазой DataSource.По сути это означает, что между задачей DataSource и задачей GroupCombine будет происходить обмен данными.Это может снизить производительность для задания.

Я ожидаю, что оператор GroupCombine должен быть объединен с задачей DataSource.Есть ли какой-нибудь флаг, который мог бы помочь мне сделать это

Согласно документации GroupCombine, можно увидеть

 * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
 * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
 * only a sub-group.

Это фрагмент кода, который я использую для объединениялокально, а затем уменьшите все локальные группы.

val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  .map { (_, 1) }
  .combineGroup(new GroupCombineFunction[(String, Int), (String, Int)] {
    override def combine(values: lang.Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
      values.iterator().asScala.toSeq.groupBy(p => p._1).map(t => (t._1, t._2.map(_._2).sum)).foreach(p => out.collect(p))
    }
  })
  .reduceGroup(new GroupReduceFunction[(String, Int), (String, Int)] {
  override def reduce(values: lang.Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
    values.iterator().asScala.toSeq.groupBy(p => p._1).map(t => (t._1, t._2.map(_._2).sum)).foreach(p => out.collect(p))
  }
})

Вот диаграмма, показывающая jobGraph enter image description here Фаза GroupCombine, не объединенная с Фазой источника данных

FYI : Я ожидаю, что между фазами GroupCombine и GroupReduce произойдет перестановка, но не между фазами DataSource и GroupCombine.

...