Преобразовать и сохранить Spark DStream в нескольких отдельных местах параллельно? - PullRequest
1 голос
/ 25 сентября 2019

У меня есть сценарий использования DStream, который содержит данные с несколькими уровнями вложенности, и у меня есть требование сохранять различные элементы из этих данных в отдельных местах HDFS.Мне удалось решить эту проблему с помощью Spark SQL, как показано ниже:

  val context = new StreamingContext(sparkConf, Seconds(duration))
  val stream = context.receiverStream(receiver)
  stream.foreachRDD {rdd =>
    val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
    import spark.implicits._
    rdd.toDF.drop("childRecords").write.parquet("ParentTable")
  }
  stream.foreachRDD {rdd =>
    val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
    import spark.implicits._
    rdd.toDF.select(explode(col("childRecords")).as("children"))
      .select("children.*").write.parquet("ChildTable")
  }
  // repeat as necessary if parent table has more different kinds of child records,
  // or if child table itself also has child records too

Код работает, но единственная проблема, с которой я сталкиваюсь, состоит в том, что постоянство выполняется последовательно - первый поток .foreachRDD должензавершить до начала второго и т. д. В идеале я бы хотел, чтобы задание постоянного хранения для ChildTable запускалось, не дожидаясь завершения ParentTable, поскольку они пишут в разные места и не будут конфликтовать.На самом деле, у меня есть около 10 различных заданий, все из которых ожидают последовательного выполнения, и, вероятно, я бы увидел значительное улучшение во времени выполнения, если бы смог выполнить их все параллельно.

...