Конвертировать Spark Dataframe [Row] в Map [String, Any] - PullRequest
1 голос
/ 26 февраля 2020

Можно ли каким-либо образом преобразовать Spark Dataframe в Dataset[Map[String,Any]], чтобы можно было выполнить операцию задания на стороне карты в строке после ее преобразования в Map. Схема файла в основном изменчива, поэтому на самом деле невозможно создать класс наблюдений во время компиляции для использования кодировщика продукта, например dataframe.as[MyClass].

Сложность заключается в том, что данные могут быть вложенными и иметь Карты и списки внутри.

Пример данных, представленных в Json:

{
    "field1": "Sally",
    "field2": "Green",
    "field3": 27,
    "subObject": {
        "subField": "Value"
    },
    "fieldArray": ["A","B","C"],
    "accounting": [
        {
            "firstName": "John",
            "lastName": "Doe",
            "nestedSubField": {
                "x": "y"
            },
            "age": [11,2,33]
        },
        {
            "firstName": "Mary",
            "lastName": "Smith",
            "age": [11,2,33]
        }
    ],
    "sales": [
        {
            "firstName": "Sally",
            "lastName": "Green",
            "age": 27
        },
        {
            "firstName": "Jim",
            "lastName": "Galley",
            "age": 41
        }
    ]
}

Когда эти данные загружаются в фрейм данных, мы получаем следующую схему для фрейма данных .

Схема данных

  root
     |-- accounting: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- age: array (nullable = true)
     |    |    |    |-- element: long (containsNull = true)
     |    |    |-- firstName: string (nullable = true)
     |    |    |-- lastName: string (nullable = true)
     |    |    |-- nestedSubField: struct (nullable = true)
     |    |    |    |-- x: string (nullable = true)
     |-- field1: string (nullable = true)
     |-- field2: string (nullable = true)
     |-- field3: long (nullable = true)
     |-- fieldArray: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- sales: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- age: long (nullable = true)
     |    |    |-- firstName: string (nullable = true)
     |    |    |-- lastName: string (nullable = true)
     |-- subObject: struct (nullable = true)
     |    |-- subField: string (nullable = true)

Можно ли каким-либо образом преобразовать этот кадр данных в Map [String, Any], который будет выглядеть примерно так. Немного отформатировал.

Map(
    accounting -> List(
            Map(
                firstName -> John, 
                lastName -> Doe, 
                nestedSubField -> Map(x -> y), 
                age -> List(11, 2, 33)
            ),
            Map(
                firstName -> Mary, 
                lastName -> Smith, 
                age -> List(11, 2, 33)
            )
        ),
    fieldArray -> List(A, B, C),
    subObject -> Map(subField -> Value), 
    field1 -> Sally, 
    field3 -> 27, 
    sales -> List(
            Map(
                firstName -> Sally, 
                lastName -> Green, 
                age -> 27
                ), 
            Map(
                firstName -> Jim, 
                lastName -> Galley, 
                age -> 41)
                ), 
    field2 -> Green
)

В настоящее время я добиваюсь этого следующим образом. JsonUtil - это оболочка над Jackson API

val dataframeAsJsonDataset:Dataset[String] = dataframe.toJSON
val result:Dataset[Map[String,Any]] = dataframeAsJsonDataset.map(each=>JsonUtils.fromJson(each,classOf[Map[String,Any]]))

Вышеуказанный подход действительно плох и работает очень плохо. Любое предложение по этому вопросу было бы очень полезно.

1 Ответ

0 голосов
/ 07 марта 2020

Если ваша схема будет развиваться, качество данных может быть проблемой. Это требует использования чего-то вроде spark-records для пользовательского анализа данных с соответствующими проверками ошибок.

В качестве альтернативы вы можете сделать это с минимальным кодом (и безопасностью), прочитать данные в виде текста и затем проанализировать, используя, скажем, json4s, например,

sc.textFile(pathToJson)
  .map { json =>
    import org.json4s._
    org.json4s.jackson.JsonMethods.parse(json) match {
      case JObject(obj) =>
        // build your Map
    }
  }
  .toDF("my_map")
...