Note
код в scala
и я использовал Spark Structured Streaming
.
Вы можете использовать функцию org.apache.spark.sql.functions.explode
для выравнивания столбцов массива. Пожалуйста, проверьте код ниже.
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> val schema = DataType.fromJson("""{"type":"struct","fields":[{"name":"age","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"phones","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"models","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}},{"name":"watches","type":{"type":"struct","fields":[{"name":"models","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]
schema: org.apache.spark.sql.types.StructType = StructType(StructField(age,LongType,true), StructField(name,StringType,true), StructField(phones,ArrayType(StructType(StructField(models,ArrayType(StringType,true),true), StructField(name,StringType,true)),true),true), StructField(watches,StructType(StructField(models,ArrayType(StringType,true),true), StructField(name,StringType,true)),true))
scala> val streamDF = spark.readStream.format("json").schema(schema).load("/tmp/jdata")
streamDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]
scala> :paste
// Entering paste mode (ctrl-D to finish)
streamDF
.withColumn("watches_models",explode($"watches.models")).withColumn("watches_name",$"watches.name")
.withColumn("phones_models",explode($"phones.models")).withColumn("phones_models",explode($"phones_models"))
.withColumn("phones_name",explode($"phones.name"))
.drop("watches","phones")
.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
// Exiting paste mode, now interpreting.
-------------------------------------------
Batch: 0
-------------------------------------------
+---+------+--------------------+------------+--------------+-----------+
|age| name| watches_models|watches_name| phones_models|phones_name|
+---+------+--------------------+------------+--------------+-----------+
| 26| Akash|Apple Watch Series 5| Apple| iphone X| Apple|
| 26| Akash|Apple Watch Series 5| Apple| iphone X| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| iphone X| Google|
| 26| Akash|Apple Watch Series 5| Apple| iphone XR| Apple|
| 26| Akash|Apple Watch Series 5| Apple| iphone XR| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| iphone XR| Google|
| 26| Akash|Apple Watch Series 5| Apple| iphone XS| Apple|
| 26| Akash|Apple Watch Series 5| Apple| iphone XS| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| iphone XS| Google|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11| Apple|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11| Google|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11 Pro| Apple|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11 Pro| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11 Pro| Google|
| 26| Akash|Apple Watch Series 5| Apple| Galaxy Note10| Apple|
| 26| Akash|Apple Watch Series 5| Apple| Galaxy Note10| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| Galaxy Note10| Google|
| 26| Akash|Apple Watch Series 5| Apple|Galaxy Note10+| Apple|
| 26| Akash|Apple Watch Series 5| Apple|Galaxy Note10+| Samsung|
+---+------+--------------------+------------+--------------+-----------+
only showing top 20 rows