Как разбить элементы массива столбца на строки потокового информационного кадра в spark - PullRequest
0 голосов
/ 14 октября 2019

У меня есть паркетный файл в учетных записях хранения ADLS Gen2. Я хочу разбить все элементы массива на соответствующие строки, а затем записать его в другое место ADLS Gen2. Изначально мои данные в файле партера выглядели так:

val query3 = spark
.readStream
.schema(readSchema)
.format("parquet")
.load("/mnt/changefeed/EDP_schema")

df = spark.read.parquet("/mnt/changefeed/EDP_schema")
display(df)

+----+----+---------+
|col1|col2|     col3|
+----+----+---------+
|   1|   A|[1, 2, 3]|
|   2|   B|   [3, 5]|
+----+----+---------+

Теперь я преобразую данные.

import org.apache.spark.sql.functions.explode
val df1 = df.withColumn("col3", explode($"col3"))
display(df1)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|   1|
|   1|   A|   2|
|   1|   A|   3|
|   2|   B|   3|
|   2|   B|   5|
+----+----+----+

Теперь, когда я собираюсь записать их в другое местоположение ADLS Gen2, я получаюошибка ниже

import org.apache.spark.sql.streaming.Trigger
val writeS = df2
.writeStream
.partitionBy("readS")
.outputMode("append")
.format("parquet")
.option("path", "/mnt/changefeed/EDP_Final_schema")
.option("checkpointLocation", "/checkpoint_parq_merged")
.option("overwriteSchema", true)
.queryName("merge_stream")
.trigger(Trigger.ProcessingTime(10))
.start()


org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame;
...