Сопоставить словарь JSON с классом дел с помощью Spark - PullRequest
0 голосов
/ 12 сентября 2018

В последние часы я пытался преобразовать файл JSON в Scala case class с помощью Apache Spark.

JSON имеет следующую структуру:

{
  "12": {
    "wordA": 1,
    "wordB": 2,
    "wordC": 3
  },
  "13": {
    "wordX": 10,
    "wordY": 12,
    "wordZ": 15
  }
}

Первая попытка: установить схему построения

Я попытался создать мою схему искусственно:

val schema = new StructType()
   .add("",MapType(StringType, new StructType()
          .add("", StringType)
          .add("", IntegerType)))
val df = session.read
  .option("multiline",true)
  .option("mode", "PERMISSIVE")
  .schema(schema)
  .json(filePath)
df.show()

Но это, очевидно, неправильно, поскольку я должен дать имя поля.

Вторая попытка: сопоставить с классом дела

Я также пытался создать case class es, что немного элегантнее:

case class KeywordData (keywordsByCode: Map[String, WordAndWeight])

case class WordAndWeight (word: String, weight: Int)

Проблема:

Но в любом случае df.show () отображает:

+----+
|    |
+----+
|null|
+----+

Структурой JSON нелегко манипулировать, поскольку у моих столбцов нет имени исправления. Есть идеи?

Ожидаемый результат

Карта с 12 и 13 в качестве ключа и List [wordA, ... wordC] соответственно List [wordX, ..., wordZ] в качестве значений

Редактировать: Карта карты С корпусом класса

case class WordAndWeight(code: Map[String, Map[String, Integer]])

Это дает мне следующую ошибку:

+-------+----------+
|     12|        13|
+-------+----------+
|[1,2,3]|[10,12,15]|
+-------+----------+


cannot resolve '`code`' given input columns: [12, 13];
org.apache.spark.sql.AnalysisException: cannot resolve '`code`' given input columns: [12, 13];
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)

1 Ответ

0 голосов
/ 12 сентября 2018

Вы пытаетесь определить схему, в которой MapType является корневым типом. Другими словами, вы хотите, чтобы каждая строка была картой. AFAIK Spark не поддерживает MapType как корневой тип. Он поддерживает только StructType в качестве корневого типа.

Когда вы определяете тип через case-классы и отражения следующим образом:

val schema = ScalaReflection.schemaFor[KeywordData].dataType.asInstanceOf[StructType]

Вы получаете StructType в качестве корневого типа:

root
  |-- keywordsByCode: map (nullable = true)
  |    |-- key: string
  |    |-- value: struct (valueContainsNull = true)
  |    |    |-- word: string (nullable = true)
  |    |    |-- weight: integer (nullable = true)

Это означает, что Spark создаст DataFrame с одним столбцом, который называется keywordsByCode. И он будет ожидать JSON, как это

{"keywordsByCode":{"12":{"wordA":1,"wordB":2,"wordC":3},"13":{"wordX":10,"wordY":12,"wordZ":15}}}

Вам нужно изменить JSON или прочитать файл как текст, а затем проанализировать каждую строку в JSON.

UPDATE

Я не заметил еще одной ошибки, ваш класс должен выглядеть так:

case class KeywordData (keywordsByCode: Map[String, Map[String, Int]])

Потому что ваш JSON имеет вложенный MapType. Таким образом, схема будет выглядеть так:

root
|-- keywordsByCode: map (nullable = true)
|    |-- key: string
|    |-- value: map (valueContainsNull = true)
|    |    |-- key: string
|    |    |-- value: integer (valueContainsNull = true)

Мой код тестирования:

val df = spark.read
  .option("multiline",true)
  .option("mode", "PERMISSIVE")
  .schema(ScalaReflection.schemaFor[KeywordData].dataType.asInstanceOf[StructType])
  .json("test.json")
 df.printSchema()
 df.explain(true)
 df.show(10)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...