Я новичок в аккумуляторах в Spark.Я создал аккумулятор, который собирает информацию о сумме и количестве всех столбцов в кадре данных в карту.Это не работает должным образом, поэтому у меня есть несколько сомнений.
Когда я запускаю этот класс (вставленный ниже) в локальном режиме, я вижу, что аккумуляторы обновляются, но окончательное значение все еще пусто.В целях отладки я добавил оператор print в add ().
Q1) Почему конечный накопительный ресурс не обновляется при добавлении аккумулятора?
Для справки я изучил накопитель CollectionsAccumulator, в котором они использовали SynchronizedList из коллекций Java.
Q2) Требуется ли синхронный / параллельный сбор для обновления аккумулятора?
Q3)Какая коллекция лучше всего подойдет для этой цели?
Я приложил свой поток выполнения вместе со снимком Spark UI для анализа.
Спасибо.
ИСПОЛНЕНИЕ:
ВХОДНАЯ ДАННАЯ КАДРА -
+-------+-------+
|Column1|Column2|
+-------+-------+
|1 |2 |
|3 |4 |
+-------+-------+
ВЫХОД -
Добавить - Карта (Столбец1 -> Карта (сумма -> 1, количество -> 1), Столбец2 -> Карта (сумма -> 2, количество -> 1))
Добавить - Карта (Столбец1 -> Карта (сумма ->)4, количество -> 2), столбец 2 -> карта (сумма -> 6, количество -> 2))
TestRowAccumulator (id: 1, name: Some (Test Accumulator for Sum & Count), значение: Map ())
SPARK UI SNAPSHOT -
CLASS:
class TestRowAccumulator extends AccumulatorV2[Row,Map[String,Map[String,Int]]]{
private var colMetrics: Map[String, Map[String, Int]] = Map[String , Map[String , Int]]()
override def isZero: Boolean = this.colMetrics.isEmpty
override def copy(): AccumulatorV2[Row, Map[String,Map[String,Int]]] = {
val racc = new TestRowAccumulator
racc.colMetrics = colMetrics
racc
}
override def reset(): Unit = {
colMetrics = Map[String,Map[String,Int]]()
}
override def add(v: Row): Unit = {
v.schema.foreach(field => {
val name: String = field.name
val value: Int = v.getAs[Int](name)
if(!colMetrics.contains(name))
{
colMetrics = colMetrics ++ Map(name -> Map("sum" -> value , "count" -> 1 ))
}else
{
val metric = colMetrics(name)
val sum = metric("sum") + value
val count = metric("count") + 1
colMetrics = colMetrics ++ Map(name -> Map("sum" -> sum , "count" -> count))
}
})
}
override def merge(other: AccumulatorV2[Row, Map[String,Map[String,Int]]]): Unit = {
other match {
case t:TestRowAccumulator => {
colMetrics.map(col => {
val map2: Map[String, Int] = t.colMetrics.getOrElse(col._1 , Map())
val map1: Map[String, Int] = col._2
map1 ++ map2.map{ case (k,v) => k -> (v + map1.getOrElse(k,0)) }
} )
}
case _ => throw new UnsupportedOperationException(s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
}
override def value: Map[String, Map[String, Int]] = {
colMetrics
}
}