Как JSON-экранировать поле String в DataFrame Spark с новым столбцом - PullRequest
0 голосов
/ 14 июня 2019

Как написать новый столбец в формате JSON через DataFrame. Я попробовал несколько подходов, но он записывает данные в виде строкового поля с JSON-экранированием. В настоящее время его написание как {"test":{"id":1,"name":"name","problem_field": "{\"x\":100,\"y\":200}"}}

Вместо этого я хочу, чтобы это было как {"test":{"id":1,"name":"name","problem_field": {"x":100,"y":200}}}

problem_field - это новый столбец, который создается на основе значений, считанных из других полей как:

val dataFrame = oldDF.withColumn("problem_field", s)

Я попробовал следующие подходы

  1. dataFrame.write.json(<<outputPath>>)
  2. dataFrame.toJSON.map(value => value.replace("\\", "").replace("{\"value\":\"", "").replace("}\"}", "}")).write.json(<<outputPath>>)

Пробовал конвертировать в DataSet, но не повезло. Любые указатели приветствуются.

Я уже попробовал логику, упомянутую здесь: Как разрешить Spark анализировать поле String, экранированное JSON, как объект JSON, чтобы вывести правильную структуру в DataFrames?

1 Ответ

1 голос
/ 14 июня 2019

Для начала, данные вашего примера имеют постороннюю запятую после "y\":200, что предотвратит их анализ, так как это недопустимый JSON.

Оттуда вы можете использовать from_json для анализа поля, предполагая, что вы знаете схему. В этом примере я разбираю поле отдельно, чтобы сначала получить схему:

scala> val json = spark.read.json(Seq("""{"test":{"id":1,"name":"name","problem_field": "{\"x\":100,\"y\":200}"}}""").toDS)
json: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> json.printSchema
root
 |-- test: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- problem_field: string (nullable = true)


scala> val problem_field = spark.read.json(json.select($"test.problem_field").map{
case org.apache.spark.sql.Row(x : String) => x
})
problem_field: org.apache.spark.sql.DataFrame = [x: bigint, y: bigint]          

scala> problem_field.printSchema
root
 |-- x: long (nullable = true)
 |-- y: long (nullable = true)

scala> val fixed = json.withColumn("test", struct($"test.id", $"test.name", from_json($"test.problem_field", problem_field.schema).as("problem_field")))
fixed: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> fixed.printSchema
root
 |-- test: struct (nullable = false)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- problem_field: struct (nullable = true)
 |    |    |-- x: long (nullable = true)
 |    |    |-- y: long (nullable = true)

Если схема содержимого problem_field несовместима между строками, это решение все еще будет работать, но может не быть оптимальным способом обработки вещей, поскольку оно создаст разреженный Dataframe, где каждая строка содержит каждое поле, встречающееся в problem_field. Например:

scala> val json = spark.read.json(Seq("""{"test":{"id":1,"name":"name","problem_field": "{\"x\":100,\"y\":200}"}}""", """{"test":{"id":1,"name":"name","problem_field": "{\"a\":10,\"b\":20}"}}""").toDS)
json: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> val problem_field = spark.read.json(json.select($"test.problem_field").map{case org.apache.spark.sql.Row(x : String) => x})
problem_field: org.apache.spark.sql.DataFrame = [a: bigint, b: bigint ... 2 more fields]

scala> problem_field.printSchema
root
 |-- a: long (nullable = true)
 |-- b: long (nullable = true)
 |-- x: long (nullable = true)
 |-- y: long (nullable = true)

scala> val fixed = json.withColumn("test", struct($"test.id", $"test.name", from_json($"test.problem_field", problem_field.schema).as("problem_field")))
fixed: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> fixed.printSchema
root
 |-- test: struct (nullable = false)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- problem_field: struct (nullable = true)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- x: long (nullable = true)
 |    |    |-- y: long (nullable = true)

scala> fixed.select($"test.problem_field.*").show
+----+----+----+----+
|   a|   b|   x|   y|
+----+----+----+----+
|null|null| 100| 200|
|  10|  20|null|null|
+----+----+----+----+

На протяжении сотен, тысяч или миллионов строк вы можете видеть, как это может представлять проблему.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...