Как сделать преобразование карт безопасным для предотвращения зависаний искровых исполнителей - PullRequest
0 голосов
/ 11 апреля 2019

Я конвертирую один формат данных в другой.Во время этого преобразования мне нужно проанализировать строку JSON, которая содержит List of Map [String, String / Numeric], и уменьшить ее до Map [String, String].Я делаю так:

val dfPhs = sparkSession.read.schema(buildSchema()).json(path)
val dsPoints: Dataset[DeviceStage] = dfPhs.map(parse)
def parse(p: Row): = {
      // some generic types parse
      val customfields = parseJsonMap(p, "other")

      return ResultSchema(
        id,
        // more fields here
        customfields
      )
  }

def parseJsonMap(p: Row, name: String): collection.immutable.Map[String,String] = {
    val i = p.fieldIndex(name)
    if (p.isNullAt(i))
      Map.empty[String, String]
    else {
      JSON.parseFull(p.getString(i)) match {
        case Some(map: List[Map[String, _]]) =>
          map.map(map => {
          map("name").toString -> (map("value") match {
            case s: String => s
            case i: java.lang.Number => i.toString
            case _ => throw new Exception("Unexpected type $unexpectedType")
        })}).toMap
        case _ => Map.empty[String, String]
      }
    }
  }

Если я закомментирую метод parseJsonMap и дам заглушку Map.empty [String, String], все будет работать как шарм.Я обнаружил, что статья Задачи Spark застряли на RUNNING .Это похоже на мой случай.Я попытался указать Map как неизменяемый явно, но столкнулся с той же проблемой.Похоже, сам итератор (операции map, toMap) не является потокобезопасным.Как я могу это изменить?

...