Scala: доступ / редактирование карты из dataframe.foreach - PullRequest
0 голосов
/ 22 февраля 2019

Я довольно новичок в scala, поэтому любые советы / основы приветствуются.Я пытаюсь получить доступ к изменяемой карте и изменить ее изнутри dataframe.foreach, но я не могу этого сделать.

Теперь я понимаю, как работает spark для нескольких исполнителей, и данные реплицируются на каждый узел для вычислений.Поэтому я провел поиск в Интернете и получил класс collectionAccumulator , который поможет сохранить коллекцию на разных узлах.

Мой код

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.joda.time.DateTime

import org.apache.spark.sql.{DataFrame, Row} 
import spark.sparkContext._

import org.apache.spark.{AccumulableParam, SparkConf}
import org.apache.spark.serializer.JavaSerializer
import scala.collection.mutable.{ HashMap => MutableHashMap }


var m  =  scala.collection.mutable.Map("AL" -> "Alabama")


// this creates a collection(list) of accumulator 
var mutableMapAcc= spark.sparkContext.collectionAccumulator[scala.collection.mutable.Map[String,String]]("mutableMap") 
mutableMapAcc.add(  scala.collection.mutable.Map("defaultKey" -> "defaultValue"))

var _mutableMap = scala.collection.mutable.Map("mmap" -> "mmapvalue")

val df = Seq(
          ("Andy","a1", 20,new DateTime().toString()),     
          ("Berta","b1", 30,new DateTime().toString()),
          ("Joe","j1", 40,new DateTime().toString())).toDF("name","sector","age","AsOfDate")


println("===================================before foreach======================================================")
println(mutableMapAcc)
println("=========================================================================================")
df.foreach { row =>
 println(mutableMapAcc.value.size)
 mutableMapAcc.add(scala.collection.mutable.Map( row(0).toString() -> row(1).toString() ) )
 println(mutableMapAcc.value) 
}
println("===================================after foreach======================================================")
println(mutableMapAcc)

Вывод:

enter image description here

Каждый раз, когда я получаю размер mutableMapAcc равным нулю.Я хочу получить доступ к первой карте, которую я добавил в операторе (ниже)

mutableMapAcc.add(  scala.collection.mutable.Map("defaultKey" -> "defaultValue"))

из цикла foreach, а затем заставить его вести себя как словарь (как мы делаем это в C #) и добавить в словарьнапример:

mutableMapAcc.value.get(0) += row(0).toString() -> row(1).toString()

Кроме того, я знаю, что такие структуры данных должны быть легковесными, поскольку широковещательная рассылка на все узлы будет происходить каждый раз, когда происходит чтение и запись на карту (в моем случае), и это нормально.для моего случая использования. Я просто хочу избежать введения другого сервиса для этого (например, redis / any db), если spark может сделать это для меня.

...