Я конвертирую один формат данных в другой.Во время этого преобразования мне нужно проанализировать строку JSON, которая содержит List of Map [String, String / Numeric], и уменьшить ее до Map [String, String].Я делаю так:
val dfPhs = sparkSession.read.schema(buildSchema()).json(path)
val dsPoints: Dataset[DeviceStage] = dfPhs.map(parse)
def parse(p: Row): = {
// some generic types parse
val customfields = parseJsonMap(p, "other")
return ResultSchema(
id,
// more fields here
customfields
)
}
def parseJsonMap(p: Row, name: String): collection.immutable.Map[String,String] = {
val i = p.fieldIndex(name)
if (p.isNullAt(i))
Map.empty[String, String]
else {
JSON.parseFull(p.getString(i)) match {
case Some(map: List[Map[String, _]]) =>
map.map(map => {
map("name").toString -> (map("value") match {
case s: String => s
case i: java.lang.Number => i.toString
case _ => throw new Exception("Unexpected type $unexpectedType")
})}).toMap
case _ => Map.empty[String, String]
}
}
}
Если я закомментирую метод parseJsonMap и дам заглушку Map.empty [String, String], все будет работать как шарм.Я обнаружил, что статья Задачи Spark застряли на RUNNING .Это похоже на мой случай.Я попытался указать Map как неизменяемый явно, но столкнулся с той же проблемой.Похоже, сам итератор (операции map, toMap) не является потокобезопасным.Как я могу это изменить?