Я использую данные из kafka через потоковую структурированную искру и пытаюсь записать их в 3 разных источника.Я хочу, чтобы потоки выполнялись последовательно, поскольку логика (в модуле записи) в stream2 (query2) зависит от stream1 (query1).Происходит следующее: запрос2 выполняется перед запросом1 и мои логические прерывания.
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("assign"," {\""+topic+"\":[0]}")
.load()
val query1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
df1.agg(min("offset"), max("offset"))
.writeStream
.foreach(writer)
.outputMode("complete")
.trigger(Trigger.ProcessingTime("2 minutes"))
.option("checkpointLocation", checkpoint_loc1).start()
//result= (derived from some processing over 'inputDf' dataframe)
val query2 = result.select(result("eventdate")).distinct
distDates.writeStream.foreach(writer1)
.trigger(Trigger.ProcessingTime("2 minutes"))
.option("checkpointLocation", checkpoint_loc2).start()
val query3 = result.writeStream
.outputMode("append")
.format("orc")
.partitionBy("eventdate")
.option("path", "/warehouse/test_duplicate/download/data1")
.option("checkpointLocation", checkpoint_loc)
.option("maxRecordsPerFile", 999999999)
.trigger(Trigger.ProcessingTime("2 minutes"))
.start()
spark.streams.awaitAnyTermination()
result.checkpoint()