Как сохранить DSStream из предыдущего пакета и использовать в следующем: Spark Streaming - PullRequest
0 голосов
/ 31 мая 2019

У меня есть два потока данных, и я присоединяюсь к ним каждую минуту.Те записи, которые не объединены, будут объединены с записями в следующем пакете.Это означает, что для каждой итерации у меня есть два выхода: одна объединенная запись, а другая - без сшиванияобъединенная запись будет сохранена в каталоге, а несшитая запись будет использоваться в следующей итерации.

Я кеширую это, но в следующей итерации этот rdd пуст.

Я пытался кешировать rdd и использовалв следующей итерации.

def getUnmatchedRecord(stream1: DStream[(String,String)],stream2: DStream[(String,String)],joinedStream:DStream[(String,String)]):(DStream[(String,String)],DStream[(String,String)])={

    val unmathcedStream1=stream1.leftOuterJoin(joinedStream).filter(x=>x._2._2==None).map(x=>(x._1,x._2._1))
    unmathcedStream1.foreachRDD(x=>{
      println(x.cache().count())
    })

    val 
 unmathcedStream2=stream2.leftOuterJoin(joinedStream).filter(x=>x._2._2==None).map(x=>(x._1,x._2._1))
    unmathcedStream2.foreachRDD(x=>{
      println(x.cache().count())
    })
    (unmathcedStream1,unmathcedStream2)
  }

def main(args: Array[String]): Unit = {
//
//

val t=ssc.queueStream(new mutable.Queue[RDD[(String,String)]])
var unmatchedRecord=getUnmatchedRecord(t,t,t,t)
val stream1 = ssc.textFileStream("").join(unmatchedRecord._1)
val stream2 = ssc.textFileStream("").join(unmatchedRecord._2)

val finalResult=stream1.join(stream2)

unmatchedRecord=getUnmatchedRecord(stream1 ,stream2,finalResult)
...