Структурированная потоковая передача Spark с Persist - PullRequest
0 голосов
/ 26 мая 2020

Мы разрабатываем приложение Spark Structured Streaming, в котором первичный потоковый набор данных дважды объединяется с двумя разными наборами данных Stati c. И результатом этих двух присоединений является объединение в качестве окончательного результата. Здесь выполняется первичный потоковый набор данных twice из-за двух объединений. И каждое из этих выполнений рендерит разные результаты (внешний вызов API связан с одним из этапов, который изменяется со временем).

До использования структурированной потоковой передачи мы использовали потоковую передачу Spark, и здесь мы используем persist первичный набор данных, чтобы он не вычислялся дважды при удалении такого рода ошибок. К сожалению, мне не удалось найти такой подход со структурированной потоковой передачей, поскольку каждый раз, когда я пытаюсь сохранить первичный потоковый набор данных, мы получаем следующую ошибку:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

def main(args: Array[String]): Unit = {
  val spark = SparkSession
      .builder
      .appName("Spark-Structured-Streaming")
      .master("local[4]")
      .getOrCreate()

  val primaryStreamingDataset = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
      .as[String]
      .map(id => {
       // API Call to retrieve data which changes with time 
       val data = .. 
       DataModel(id = id, data = data)
      })
      //.persist() /* Ideal approach would be to persist the primary DS but that fails with above error */

  // First execution of primaryStreamingDataset
  val domainXDataSet = primaryStreamingDataset.joinWith(..)

  // Second execution of primaryStreamingDataset
  val domainYDataSet = primaryStreamingDataset.joinWith(..)

  val domainQuery = domainXDataSet.union(domainYDataSet)
      .writeStream
      .outputMode("append")
      .format("console")
      .start()

  domainQuery.awaitTermination()
}

PS: Мы не можем выполнять последовательные соединения с обоими наборами данных stati c одно за другим, чтобы избежать повторного выполнения, поскольку проблема имеет довольно много других осложнений (плоская карта фильтра по предоставленным идентификаторам, которые различаются для обоих доменов)

В соответствии с вышеуказанным требованием, как добиться желаемого результата в Spark Structured Streaming?

...