Создание или доступ к аккумуляторам Spark от исполнителей - PullRequest
0 голосов
/ 29 января 2019

Мне нужны собственные метрики, которые определяются свойствами бизнес-объектов.Например, quadkeys, и я хочу объединить их на этапе обработки.Поэтому, когда исполнитель на этапе обработки сталкивается с новым агрегирующим тегом, создается новый накопитель метрик.Примерно так:

val ints = sparkSession.sparkContext.parallelize(0 to 9, 3)
//calculate odd and even numbers
ints.foreach { n =>
  println(s"int: $n")
  val metricAccumulator = getOrCreateAccumulator((n%2).toString)
  metricAccumulator.add(1)
}

Если есть способ создать аккумуляторы из исполнителей?Потому что я не делаю точных агрегирующих блоков заранее.Или какой подход лучше?

Обновление 1 Я создал собственный MapAccumulator на основе:

class HashMapAccumulator(var value: MutableHashMap[String, Int]) extends AccumulatorV2[(String, Int), MutableHashMap[String, Int]]{
  def this() = this(MutableHashMap.empty)

  override def isZero: Boolean = value.isEmpty

  override def copy(): AccumulatorV2[(String, Int), MutableHashMap[String, Int]] = new HashMapAccumulator()

  override def reset(): Unit = value = MutableHashMap.empty

  override def add(v: (String, Int)): Unit = value.get(v._1) match {
    case Some(e) => value.update(v._1, e + v._2)
    case None => value += v
  }

  override def merge(other: AccumulatorV2[(String, Int), MutableHashMap[String, Int]]): Unit = other match {
    case map: HashMapAccumulator =>
      map.value.foreach(v => this.add(v))
    case _ =>
      throw new UnsupportedOperationException(
        s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }
}

и использую в таких тестах:

    val ints = sparkSession.sparkContext.parallelize(0 to 9, 3)
val accum = new HashMapAccumulator()
sparkSession.sparkContext.register(accum, "My Accum")
var counter = 0
ints.foreach { n =>
  println(s"int: $n")
  counter = counter + 1
  accum.add((n % 2).toString -> 1)
}

Похоже, это прекрасно работает.Любые предложения кто-нибудь?

...