Искровой аккумулятор - PullRequest
       2

Искровой аккумулятор

0 голосов
/ 12 февраля 2019

Я новичок в аккумуляторах в Spark.Я создал аккумулятор, который собирает информацию о сумме и количестве всех столбцов в кадре данных в карту.Это не работает должным образом, поэтому у меня есть несколько сомнений.

Когда я запускаю этот класс (вставленный ниже) в локальном режиме, я вижу, что аккумуляторы обновляются, но окончательное значение все еще пусто.В целях отладки я добавил оператор print в add ().

Q1) Почему конечный накопительный ресурс не обновляется при добавлении аккумулятора?

Для справки я изучил накопитель CollectionsAccumulator, в котором они использовали SynchronizedList из коллекций Java.

Q2) Требуется ли синхронный / параллельный сбор для обновления аккумулятора?

Q3)Какая коллекция лучше всего подойдет для этой цели?

Я приложил свой поток выполнения вместе со снимком Spark UI для анализа.

Спасибо.

ИСПОЛНЕНИЕ:

ВХОДНАЯ ДАННАЯ КАДРА -

+-------+-------+
|Column1|Column2|
+-------+-------+
|1      |2      |
|3      |4      |
+-------+-------+

ВЫХОД -

Добавить - Карта (Столбец1 -> Карта (сумма -> 1, количество -> 1), Столбец2 -> Карта (сумма -> 2, количество -> 1))

Добавить - Карта (Столбец1 -> Карта (сумма ->)4, количество -> 2), столбец 2 -> карта (сумма -> 6, количество -> 2))

TestRowAccumulator (id: 1, name: Some (Test Accumulator for Sum & Count), значение: Map ())

SPARK UI SNAPSHOT -

SPARK UI -

CLASS:

class TestRowAccumulator extends AccumulatorV2[Row,Map[String,Map[String,Int]]]{

  private var colMetrics: Map[String, Map[String, Int]] = Map[String , Map[String , Int]]()


  override def isZero: Boolean = this.colMetrics.isEmpty

  override def copy(): AccumulatorV2[Row, Map[String,Map[String,Int]]] = {
    val racc = new TestRowAccumulator
    racc.colMetrics = colMetrics
    racc
  }

  override def reset(): Unit = {
    colMetrics = Map[String,Map[String,Int]]()
  }

  override def add(v: Row): Unit = {

    v.schema.foreach(field => {
      val name: String = field.name
      val value: Int = v.getAs[Int](name)
      if(!colMetrics.contains(name))
        {
          colMetrics = colMetrics ++ Map(name -> Map("sum" -> value , "count" -> 1 ))
        }else
        {
          val metric = colMetrics(name)
          val sum = metric("sum") + value
          val count = metric("count") + 1

          colMetrics = colMetrics ++ Map(name -> Map("sum" -> sum , "count" -> count))
        }
    })
  }

  override def merge(other: AccumulatorV2[Row, Map[String,Map[String,Int]]]): Unit = {
    other match {
      case t:TestRowAccumulator => {
        colMetrics.map(col => {
          val map2: Map[String, Int] = t.colMetrics.getOrElse(col._1 , Map())
          val map1: Map[String, Int] = col._2
          map1 ++ map2.map{ case (k,v) => k -> (v + map1.getOrElse(k,0)) }
        } )
      }
      case _ => throw new UnsupportedOperationException(s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
    }
  }

  override def value: Map[String, Map[String, Int]] = {
    colMetrics
  }
}

1 Ответ

0 голосов
/ 13 февраля 2019

После небольшой отладки я обнаружил, что вызывается функция слияния.У него был ошибочный код, поэтому накопленное значение было Map ()

ИСПОЛНЕНИЕ ПОТОКА АККУМУЛЯТОРА (МЕСТНЫЙ РЕЖИМ): ДОБАВИТЬ ДОБАВИТЬ MERGE

Как только я исправил функцию слияния, аккумулятор заработал, как и ожидалось

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...