У меня есть два потока данных, и я присоединяюсь к ним каждую минуту.Те записи, которые не объединены, будут объединены с записями в следующем пакете.Это означает, что для каждой итерации у меня есть два выхода: одна объединенная запись, а другая - без сшиванияобъединенная запись будет сохранена в каталоге, а несшитая запись будет использоваться в следующей итерации.
Я кеширую это, но в следующей итерации этот rdd пуст.
Я пытался кешировать rdd и использовалв следующей итерации.
def getUnmatchedRecord(stream1: DStream[(String,String)],stream2: DStream[(String,String)],joinedStream:DStream[(String,String)]):(DStream[(String,String)],DStream[(String,String)])={
val unmathcedStream1=stream1.leftOuterJoin(joinedStream).filter(x=>x._2._2==None).map(x=>(x._1,x._2._1))
unmathcedStream1.foreachRDD(x=>{
println(x.cache().count())
})
val
unmathcedStream2=stream2.leftOuterJoin(joinedStream).filter(x=>x._2._2==None).map(x=>(x._1,x._2._1))
unmathcedStream2.foreachRDD(x=>{
println(x.cache().count())
})
(unmathcedStream1,unmathcedStream2)
}
def main(args: Array[String]): Unit = {
//
//
val t=ssc.queueStream(new mutable.Queue[RDD[(String,String)]])
var unmatchedRecord=getUnmatchedRecord(t,t,t,t)
val stream1 = ssc.textFileStream("").join(unmatchedRecord._1)
val stream2 = ssc.textFileStream("").join(unmatchedRecord._2)
val finalResult=stream1.join(stream2)
unmatchedRecord=getUnmatchedRecord(stream1 ,stream2,finalResult)