Мы разрабатываем приложение Spark Structured Streaming, в котором первичный потоковый набор данных дважды объединяется с двумя разными наборами данных Stati c. И результатом этих двух присоединений является объединение в качестве окончательного результата. Здесь выполняется первичный потоковый набор данных twice
из-за двух объединений. И каждое из этих выполнений рендерит разные результаты (внешний вызов API связан с одним из этапов, который изменяется со временем).
До использования структурированной потоковой передачи мы использовали потоковую передачу Spark, и здесь мы используем persist
первичный набор данных, чтобы он не вычислялся дважды при удалении такого рода ошибок. К сожалению, мне не удалось найти такой подход со структурированной потоковой передачей, поскольку каждый раз, когда я пытаюсь сохранить первичный потоковый набор данных, мы получаем следующую ошибку:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("Spark-Structured-Streaming")
.master("local[4]")
.getOrCreate()
val primaryStreamingDataset = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
.as[String]
.map(id => {
// API Call to retrieve data which changes with time
val data = ..
DataModel(id = id, data = data)
})
//.persist() /* Ideal approach would be to persist the primary DS but that fails with above error */
// First execution of primaryStreamingDataset
val domainXDataSet = primaryStreamingDataset.joinWith(..)
// Second execution of primaryStreamingDataset
val domainYDataSet = primaryStreamingDataset.joinWith(..)
val domainQuery = domainXDataSet.union(domainYDataSet)
.writeStream
.outputMode("append")
.format("console")
.start()
domainQuery.awaitTermination()
}
PS: Мы не можем выполнять последовательные соединения с обоими наборами данных stati c одно за другим, чтобы избежать повторного выполнения, поскольку проблема имеет довольно много других осложнений (плоская карта фильтра по предоставленным идентификаторам, которые различаются для обоих доменов)
В соответствии с вышеуказанным требованием, как добиться желаемого результата в Spark Structured Streaming?