Запись нескольких потоков последовательно в потоковой структурированной искре - PullRequest
0 голосов
/ 21 февраля 2019

Я использую данные из kafka через потоковую структурированную искру и пытаюсь записать их в 3 разных источника.Я хочу, чтобы потоки выполнялись последовательно, поскольку логика (в модуле записи) в stream2 (query2) зависит от stream1 (query1).Происходит следующее: запрос2 выполняется перед запросом1 и мои логические прерывания.

val inputDf = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("assign"," {\""+topic+"\":[0]}") 
  .load()

 val query1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
df1.agg(min("offset"), max("offset"))
  .writeStream
  .foreach(writer)
  .outputMode("complete")
  .trigger(Trigger.ProcessingTime("2 minutes"))
  .option("checkpointLocation", checkpoint_loc1).start()


 //result= (derived from some processing over 'inputDf' dataframe)

  val query2 = result.select(result("eventdate")).distinct   
 distDates.writeStream.foreach(writer1)
 .trigger(Trigger.ProcessingTime("2 minutes"))
 .option("checkpointLocation", checkpoint_loc2).start()


 val query3 = result.writeStream
  .outputMode("append")
  .format("orc")
  .partitionBy("eventdate")
  .option("path", "/warehouse/test_duplicate/download/data1")
  .option("checkpointLocation", checkpoint_loc)
  .option("maxRecordsPerFile", 999999999)
  .trigger(Trigger.ProcessingTime("2 minutes"))
  .start()

  spark.streams.awaitAnyTermination()
  result.checkpoint()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...