Загрузка JSON в Spark SQL - PullRequest
       7

Загрузка JSON в Spark SQL

0 голосов
/ 06 марта 2020

Я занимаюсь самообучением около JSON с Spark SQL в v2.1 и использую данные по ссылке

https://catalog.data.gov/dataset/air-quality-measures-on-the-national-environmental-health-tracking-network

Проблема у меня возникает, когда я использую:

val lines = spark.read
.option("multiLine", true).option("mode", "PERMISSIVE")
.json("E:/VW/meta_plus_sample_Data.json")`

Я получаю Spark SQL, возвращающий все данные в виде одной строки.

+--------------------+--------------------+
|                data|                meta|
+--------------------+--------------------+
|[[row-8eh8_xxkx-u...|[[[[1439474950, t...|
+--------------------+--------------------+`

И когда я удаляю:

.option("multiLine", true).option("mode", "PERMISSIVE")

Я получаю ошибку как

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Exception in thread "main" org.apache.spark.sql.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).json(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).json(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().;

Есть ли возможность достичь этого в Spark SQL с каждой записью из файла как одной строкой в ​​таблице?

1 Ответ

0 голосов
/ 06 марта 2020

Это ожидаемое поведение, так как у нас есть только one record (в ссылке, предоставленной в вопросе) в наличии мета (объекта) и данных (массива).

enter image description here

Как one json запись состоит из нескольких строк, поэтому нам нужно включить опцию multiLine.

spark.read.option("multiLine",true).option("mode","PERMISSIVE").json("tmp.json").show()

//sample data
//+--------------------+--------------------+
//|                data|                meta|
//+--------------------+--------------------+
//|[[row-8eh8_xxkx-u...|[[[[1439474950, t...|
//+--------------------+--------------------+

//access meta struct columns

df.select("meta.view.*").show()

//|           approvals|averageRating|            category|             columns| createdAt|         description|displayType|downloadCount|               flags|              grants|hideFromCatalog|hideFromDataJson|       id|indexUpdatedAt|            metadata|                name|newBackend|numberOfComments|     oid|               owner|provenance|publicationAppendEnabled|publicationDate|publicationGroup|publicationStage|               query|rights|rowClass|rowsUpdatedAt|rowsUpdatedBy|         tableAuthor|tableId|                tags|totalTimesRated|viewCount|viewLastModified|viewType|

//|[[1439474950, tru...|            0|Environmental Hea...|[[, meta_data,, :...|1439381433|The Environmental...|      table|        26159|[default, restora...|[[[public], false...|          false|           false|cjae-szjv|    1528204279|[[table, fatrow, ...|Air Quality Measu...|      true|               0|12801487|[Tracking, 94g5-7...|  official|                   false|     1439474950|         3957835|       published|[[[true, [2171820...|[read]|        |   1439402317|    94g5-7as2|[Tracking, 94g5-7...|3960642|[environmental ha...|              0|     3843|      1528203875| tabular|


//to access data array we need to explode
df.selectExpr("explode(data)").show()
//+--------------------+
//|                 col|
//+--------------------+
//|[row-8eh8_xxkx-u3...|
//|[row-u2v5_78j5-px...|
//|[row-68zj_7qfn-sx...|
//|[row-8b4d~zt5j~da...|
//|[row-5gee.63td_z6...|
//|[row-tzyx.ssxh_pz...|
//|[row-3yj2_u42c_mr...|
//|[row-va7z.p2v8.7p...|
//|[row-r7kk_e3dm-z2...|
//|[row-bnrc~w34s-4a...|
//|[row-ezrk~m5dc_5n...|
//|[row-nyya.dvnz~c6...|
//|[row-dq3i_wt6d_c6...|
//|[row-u6rc-k3mf-cn...|
//|[row-t9c6-4d4b_r6...|
//|[row-vq6r~mxzj-e6...|
//|[row-vxqn-mrpc~5b...|
//|[row-3akn_5nzm~8v...|
//|[row-ugxn~bhax.a2...|
//|[row-ieav.mdz9-m8...|
//+--------------------+

Load multiple json records:

//json array with two records
spark.read.json(Seq(("""
[{"id":1,"name":"a"},
{"id":2,"name":"b"}]
""")).toDS).show()

//as we have 2 json objects and loaded as 2 rows
//+---+----+
//| id|name|
//+---+----+
//|  1|   a|
//|  2|   b|
//+---+----+
...