Я использую GroupCombine и GroupReduce с примером WordCount.Но в отличие от исходного примера, я не могу объединить фазу GroupCombine
с фазой DataSource
.По сути это означает, что между задачей DataSource
и задачей GroupCombine
будет происходить обмен данными.Это может снизить производительность для задания.
Я ожидаю, что оператор GroupCombine
должен быть объединен с задачей DataSource
.Есть ли какой-нибудь флаг, который мог бы помочь мне сделать это
Согласно документации GroupCombine
, можно увидеть
* Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
* and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
* only a sub-group.
Это фрагмент кода, который я использую для объединениялокально, а затем уменьшите все локальные группы.
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.combineGroup(new GroupCombineFunction[(String, Int), (String, Int)] {
override def combine(values: lang.Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
values.iterator().asScala.toSeq.groupBy(p => p._1).map(t => (t._1, t._2.map(_._2).sum)).foreach(p => out.collect(p))
}
})
.reduceGroup(new GroupReduceFunction[(String, Int), (String, Int)] {
override def reduce(values: lang.Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
values.iterator().asScala.toSeq.groupBy(p => p._1).map(t => (t._1, t._2.map(_._2).sum)).foreach(p => out.collect(p))
}
})
Вот диаграмма, показывающая jobGraph Фаза GroupCombine, не объединенная с Фазой источника данных
FYI : Я ожидаю, что между фазами GroupCombine и GroupReduce произойдет перестановка, но не между фазами DataSource
и GroupCombine
.