Я использую Spark 2.3.2 и читаю многострочный файл JSON.Это вывод df.printSchema()
:
root
|-- data: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- context: struct (nullable = true)
| | | | |-- environment: struct (nullable = true)
| | | | | |-- tag: struct (nullable = true)
| | | | | | |-- weather: string (nullable = true)
| | | | | |-- weather: struct (nullable = true)
| | | | | | |-- clouds: double (nullable = true)
| | | | | | |-- rain: long (nullable = true)
| | | | | | |-- temp: long (nullable = true)
| | | | |-- personal: struct (nullable = true)
| | | | | |-- activity: struct (nullable = true)
| | | | | | |-- conditions: array (nullable = true)
| | | | | | | |-- element: string (containsNull = true)
| | | | | | |-- kind: string (nullable = true)
| | | | | |-- status: struct (nullable = true)
| | | | | | |-- speed: double (nullable = true)
| | | | |-- timespace: struct (nullable = true)
| | | | | |-- geo: struct (nullable = true)
| | | | | | |-- coordinates: array (nullable = true)
| | | | | | | |-- element: double (containsNull = true)
| | | | | | |-- type: string (nullable = true)
| | | | | |-- tag: struct (nullable = true)
| | | | | | |-- season: string (nullable = true)
| | | | | |-- timestamp: string (nullable = true)
| | | |-- passport: struct (nullable = true)
| | | | |-- pid: string (nullable = true)
| | | | |-- uid: string (nullable = true)
Видно, что файл JSON имеет вложенную структуру, и не так просто получить определенные вложенные функции, например, сезон, скорость и т. Д.
Вот как я читаю данные:
SparkSession spark = SparkSession.builder()
.config("spark.rdd.compress", "true")
.appName("Test")
.master("local[*]")
.getOrCreate();
df = spark
.read()
.option("multiLine", true).option("mode", "PERMISSIVE")
.json(filePath);
Как мне получить теги timestamp
и weather
в отдельном наборе данных?
timestamp weather
... ...
... ...
Я пыталсяэто, но это не сработало:
df.registerTempTable("df");
Dataset result = spark.sql("SELECT data.items.element.passport.uid FROM df");
или
Dataset result = df.withColumn("items",
org.apache.spark.sql.functions.explode(df.col("data.items")))
.select(df.col("items.context.environment.weather"));