У меня есть сценарий использования DStream, который содержит данные с несколькими уровнями вложенности, и у меня есть требование сохранять различные элементы из этих данных в отдельных местах HDFS.Мне удалось решить эту проблему с помощью Spark SQL, как показано ниже:
val context = new StreamingContext(sparkConf, Seconds(duration))
val stream = context.receiverStream(receiver)
stream.foreachRDD {rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
import spark.implicits._
rdd.toDF.drop("childRecords").write.parquet("ParentTable")
}
stream.foreachRDD {rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
import spark.implicits._
rdd.toDF.select(explode(col("childRecords")).as("children"))
.select("children.*").write.parquet("ChildTable")
}
// repeat as necessary if parent table has more different kinds of child records,
// or if child table itself also has child records too
Код работает, но единственная проблема, с которой я сталкиваюсь, состоит в том, что постоянство выполняется последовательно - первый поток .foreachRDD должензавершить до начала второго и т. д. В идеале я бы хотел, чтобы задание постоянного хранения для ChildTable запускалось, не дожидаясь завершения ParentTable, поскольку они пишут в разные места и не будут конфликтовать.На самом деле, у меня есть около 10 различных заданий, все из которых ожидают последовательного выполнения, и, вероятно, я бы увидел значительное улучшение во времени выполнения, если бы смог выполнить их все параллельно.