Я использую шаблон широковещания для соединения двух потоков и чтения данных из одного в другой.Код выглядит следующим образом
case class Broadcast extends BroadCastProcessFunction[MyObject,(String,Double), MyObject]{
override def processBroadcastElement(in2: (String, Double),
context: BroadcastProcessFunction[MyObject, (String, Double), MyObject]#Context,
collector:Collector[MyObject]):Unit={
context.getBroadcastState(broadcastStateDescriptor).put(in2._1,in2._2)
}
override def processElement(obj: MyObject,
readOnlyContext:BroadCastProcessFunction[MyObject, (String,Double),
MyObject]#ReadOnlyContext, collector: Collector[MyObject]):Unit={
val theValue = readOnlyContext.getBroadccastState(broadcastStateDesriptor).get(obj.prop)
//If I print the context of the state here sometimes it is empty.
out.collect(MyObject(new, properties, go, here))
}
}
Дескриптор состояния:
val broadcastStateDescriptor: MapStateDescriptor[String, Double) = new MapStateDescriptor[String, Double]("name_for_this", classOf[String], classOf[Double])
Мой код выполнения выглядит следующим образом.
val streamA :DataStream[MyObject] = ...
val streamB :DataStream[(String,Double)] = ...
val broadcastedStream = streamB.broadcast(broadcastStateDescriptor)
streamA.connect(streamB).process(new Broadcast)
Проблема в processElement
Функция состояния иногда пуста, а иногда нет.Состояние должно всегда содержать данные, поскольку я постоянно выполняю потоковую передачу из файла, который, как я знаю, содержит данные.Я не понимаю, почему это сбрасывает состояние, и я не могу получить данные.
Я попытался добавить некоторую печать в processBroadcastElement
до и после перевода данных в состояние, и в результате получается следующее
0 - 1
1 - 2
2 - 3
.. all the way to 48 where it resets back to 0
ОБНОВЛЕНИЕ: кое-что, что я заметил, когда я уменьшаю значение тайм-аута контекста потокового выполнения, результаты немного лучше.когда я его увеличиваю, карта всегда пуста.
env.setBufferTimeout(1) //better results
env.setBufferTimeout(200) //worse result (default is 100)