aggregateByKey не обновляет значение начального набора - PullRequest
0 голосов
/ 22 января 2020

Значения в hllTotal обновляются, но hllToday остается равным нулю для каждого ключа.
Может кто-нибудь помочь, почему hllToday здесь не обновляется?

 val hllToday: HllSerializable = new HllSerializable(new HLL(13, 5))
 val hllTotal: HllSerializable = new HllSerializable(new HLL(13, 5))

 val initialSet = (hllToday, hllTotal)
 val rdd7 = combinedRdd.map(tuple => {
      tuple.segmentId -> (tuple.entryTime, tuple.exitTime, tuple.hll)`
    }).aggregateByKey(initialSet)(
      (acc: (HllSerializable, HllSerializable), v: (Long, Long, HllSerializable)) => {
        if (v._1 == today_ts) {
          acc._1.getHll.union(v._3.getHll)
        }
        acc._2.getHll.union(v._3.getHll)
        acc
      }, (acc1: (HllSerializable, HllSerializable), acc2: (HllSerializable, HllSerializable)) =>    {
        acc1._1.getHll.union(acc2._1.getHll)
        acc1._2.getHll.union(acc2._2.getHll)
        acc1
      }
    )

1 Ответ

0 голосов
/ 22 января 2020

Очевидно, этот код никогда не вызывается:

if (v._1 == today_ts) {
    acc._1.getHll.union(v._3.getHll)
 }

Попробуйте заменить if (v._1 == today_ts) на if(true), чтобы узнать, обновляется ли hllToday

...