Сохранить необработанный JSON как столбец в Spark DataFrame при чтении / загрузке? - PullRequest
0 голосов
/ 07 мая 2018

Я искал способ добавить свои необработанные (JSON) данные в виде столбца при чтении моих данных в Spark DataFrame. У меня есть один способ сделать это с помощью соединения, но я надеюсь, что есть способ сделать это за одну операцию, используя Spark 2.2.x +.

Например, данные:

{"team":"Golden Knights","colors":"gold,red,black","origin":"Las Vegas"}
{"team":"Sharks","origin": "San Jose", "eliminated":"true"}
{"team":"Wild","colors":"red,green,gold","origin":"Minnesota"}

При выполнении:

val logs = sc.textFile("/Users/vgk/data/tiny.json") // example data file
spark.read.json(logs).show

Как и ожидалось, мы получим:

+--------------+----------+--------------------+--------------+
|        colors|eliminated|              origin|          team|
+--------------+----------+--------------------+--------------+
|gold,red,black|      null|           Las Vegas|Golden Knights|
|          null|      true|            San Jose|        Sharks|
|red,green,gold|      null|           Minnesota|          Wild|
|red,white,blue|     false|District of Columbia|      Capitals|
+--------------+----------+--------------------+--------------+

То, что я хотел бы иметь при начальной загрузке, это выше, но с необработанными данными JSON в качестве дополнительного столбца. Например (усеченные необработанные значения):

+--------------+-------------------------------+--------------+--------------------+
|        colors|eliminated|              origin|          team|               value|
+--------------+----------+--------------------+--------------+--------------------+
|red,white,blue|     false|District of Columbia|      Capitals|{"colors":"red,wh...|
|gold,red,black|      null|           Las Vegas|Golden Knights|{"colors":"gold,r...|
|          null|      true|            San Jose|        Sharks|{"eliminated":"tr...|
|red,green,gold|      null|           Minnesota|          Wild|{"colors":"red,gr...|
+--------------+----------+--------------------+--------------+--------------------+

Неидеальное решение включает объединение:

val logs = sc.textFile("/Users/vgk/data/tiny.json")
val df = spark.read.json(logs).withColumn("uniqueID",monotonically_increasing_id)
val rawdf = df.toJSON.withColumn("uniqueID",monotonically_increasing_id)
df.join(rawdf, "uniqueID")

Что приводит к тому же кадру данных, что и выше, но с добавленным столбцом uniqueID. Кроме того, JSON отображается из DF и не обязательно является «необработанными» данными. На практике они равны, но для моего случая использования реальные исходные данные предпочтительнее.

Кто-нибудь знает решение, которое собирает необработанные данные JSON в качестве дополнительного столбца при загрузке?

Ответы [ 2 ]

0 голосов
/ 07 мая 2018

Вы можете просто использовать to_json встроенную функцию в сочетании с .withColumn функцией как

val logs = sc.textFile("/Users/vgk/data/tiny.json")
val df = spark.read.json(logs)
import org.apache.spark.sql.functions._
df.withColumn("value", to_json(struct(df.columns.map(col): _*))).show(false)

Или еще лучше, не использовать sparkContext textFile для чтения как rdd, просто используйте sparkSession для чтения файла json как

val df = spark.read.json("/Users/vgk/data/tiny.json")

import org.apache.spark.sql.functions._
df.withColumn("value", to_json(struct(df.columns.map(col): _*))).show(false)

и вы должны получить

+--------------+----------+---------+--------------+------------------------------------------------------------------------+
|colors        |eliminated|origin   |team          |value                                                                   |
+--------------+----------+---------+--------------+------------------------------------------------------------------------+
|gold,red,black|null      |Las Vegas|Golden Knights|{"colors":"gold,red,black","origin":"Las Vegas","team":"Golden Knights"}|
|null          |true      |San Jose |Sharks        |{"eliminated":"true","origin":"San Jose","team":"Sharks"}               |
|red,green,gold|null      |Minnesota|Wild          |{"colors":"red,green,gold","origin":"Minnesota","team":"Wild"}          |
+--------------+----------+---------+--------------+------------------------------------------------------------------------+
0 голосов
/ 07 мая 2018

Если у вас есть схема данных, которые вы получаете, то вы можете использовать from_json с schema, чтобы получить все поля и оставить поле raw таким, как оно есть

val logs = spark.sparkContext.textFile(path) // example data file

val schema = StructType(
  StructField("team", StringType, true)::
  StructField("colors", StringType, true)::
  StructField("eliminated", StringType, true)::
  StructField("origin", StringType, true)::Nil
)

logs.toDF("values")
    .withColumn("json", from_json($"values", schema))
    .select("values", "json.*")

    .show(false)

Выход:

+------------------------------------------------------------------------+--------------+--------------+----------+---------+
|values                                                                  |team          |colors        |eliminated|origin   |
+------------------------------------------------------------------------+--------------+--------------+----------+---------+
|{"team":"Golden Knights","colors":"gold,red,black","origin":"Las Vegas"}|Golden Knights|gold,red,black|null      |Las Vegas|
|{"team":"Sharks","origin": "San Jose", "eliminated":"true"}             |Sharks        |null          |true      |San Jose |
|{"team":"Wild","colors":"red,green,gold","origin":"Minnesota"}          |Wild          |red,green,gold|null      |Minnesota|
+------------------------------------------------------------------------+--------------+--------------+----------+---------+

Надеюсь, что он помогает!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...