Я довольно новичок в scala, поэтому любые советы / основы приветствуются.Я пытаюсь получить доступ к изменяемой карте и изменить ее изнутри dataframe.foreach, но я не могу этого сделать.
Теперь я понимаю, как работает spark для нескольких исполнителей, и данные реплицируются на каждый узел для вычислений.Поэтому я провел поиск в Интернете и получил класс collectionAccumulator , который поможет сохранить коллекцию на разных узлах.
Мой код
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.joda.time.DateTime
import org.apache.spark.sql.{DataFrame, Row}
import spark.sparkContext._
import org.apache.spark.{AccumulableParam, SparkConf}
import org.apache.spark.serializer.JavaSerializer
import scala.collection.mutable.{ HashMap => MutableHashMap }
var m = scala.collection.mutable.Map("AL" -> "Alabama")
// this creates a collection(list) of accumulator
var mutableMapAcc= spark.sparkContext.collectionAccumulator[scala.collection.mutable.Map[String,String]]("mutableMap")
mutableMapAcc.add( scala.collection.mutable.Map("defaultKey" -> "defaultValue"))
var _mutableMap = scala.collection.mutable.Map("mmap" -> "mmapvalue")
val df = Seq(
("Andy","a1", 20,new DateTime().toString()),
("Berta","b1", 30,new DateTime().toString()),
("Joe","j1", 40,new DateTime().toString())).toDF("name","sector","age","AsOfDate")
println("===================================before foreach======================================================")
println(mutableMapAcc)
println("=========================================================================================")
df.foreach { row =>
println(mutableMapAcc.value.size)
mutableMapAcc.add(scala.collection.mutable.Map( row(0).toString() -> row(1).toString() ) )
println(mutableMapAcc.value)
}
println("===================================after foreach======================================================")
println(mutableMapAcc)
Вывод:
Каждый раз, когда я получаю размер mutableMapAcc равным нулю.Я хочу получить доступ к первой карте, которую я добавил в операторе (ниже)
mutableMapAcc.add( scala.collection.mutable.Map("defaultKey" -> "defaultValue"))
из цикла foreach, а затем заставить его вести себя как словарь (как мы делаем это в C #) и добавить в словарьнапример:
mutableMapAcc.value.get(0) += row(0).toString() -> row(1).toString()
Кроме того, я знаю, что такие структуры данных должны быть легковесными, поскольку широковещательная рассылка на все узлы будет происходить каждый раз, когда происходит чтение и запись на карту (в моем случае), и это нормально.для моего случая использования. Я просто хочу избежать введения другого сервиса для этого (например, redis / any db), если spark может сделать это для меня.