Я хочу сохранить Map [String, String] на диск и позже прочитать его как тот же тип. Почему-то я не могу найти метод collectAsMap с моим sparkContext - PullRequest
0 голосов
/ 31 августа 2018

Я работаю над Spark Scala, и существует требование сохранить Map[String, String] на диск, чтобы другое приложение Spark могло его прочитать.

(x,1),(y,2)...

Сохранить:

sc.parallelize(itemMap.toSeq).coalesce(1).saveAsTextFile(fileName)

Я делаю объединение, так как данные только 450 строк.

Но, чтобы прочитать его обратно, я не могу преобразовать его обратно в Map[String, String]

val myMap = sc.textFile(fileName).zipWithUniqueId().collect.toMap

данные поступают как

((x,1),0),((y,2),1)...

Какое возможное решение?

Спасибо.

Ответы [ 2 ]

0 голосов
/ 31 августа 2018

Метод «collectAsMap» существует в классе «PairRDDFunctions», значит, применим только для RDD с двумя значениями RDD [(K, V)].

Если этот вызов функции необходим, может быть организован с кодом ниже. Dataframe используется для хранения в формате csv и избегает анализа вручную

val originalMap = Map("x" -> 1, "y" -> 2)
// write
sparkContext.parallelize(originalMap.toSeq).coalesce(1).toDF("k", "v").write.csv(path)

// read
val restoredDF = spark.read.csv(path)
val restoredMap = restoredDF.rdd.map(r => (r.getString(0), r.getString(1))).collectAsMap()
println("restored map: " + restoredMap)

Выход:

restored map: Map(y -> 2, x -> 1)
0 голосов
/ 31 августа 2018

Загрузка текстового файла приводит к RDD[String], поэтому вам придется десериализовать ваши строковые представления кортежей.

Вы можете изменить операцию сохранения, чтобы добавить разделитель между значением кортежа 1 и значением кортежа 2, или проанализировать строку (:v1, :v2).

val d = spark.sparkContext.textFile(fileName)

val myMap = d.map(s => {
    val parsedVals = s.substring(1, s.length-1).split(",")
    (parsedVals(0), parsedVals(1))
}).collect.toMap

Кроме того, вы можете изменить операцию сохранения, чтобы создать разделитель (например, запятую) и проанализировать структуру следующим образом:

itemMap.toSeq.map(kv => kv._1 + "," + kv._2).saveAsTextFile(fileName)
val myMap = spark.sparkContext.textFile("trash3.txt")
  .map(_.split(","))
  .map(d => (d(0), d(1)))
  .collect.toMap
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...