Я реализовал поток flink с функцией BroadcastProcessFunction.Из processBroadcastElement я получаю свою модель и применяю ее к своему событию в processElement.
Я не нахожу способ провести модульное тестирование своего потока, так как я не нахожу решение, обеспечивающее отправку модели дона первое событие.Я бы сказал, что есть два способа добиться этого:
1. Найти решение, чтобы модель сначала помещалась в поток
2. Заполните состояние широковещательной передачи приором модели к выполнению потока так, чтобыон восстановлен
Возможно, я что-то пропустил, но я не нашел простого способа сделать это.
Вот простой модульный тест с моей проблемой:
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import org.scalatest.Matchers._
import org.scalatest.{BeforeAndAfter, FunSuite}
import scala.collection.mutable
class BroadCastProcessor extends BroadcastProcessFunction[Int, (Int, String), String] {
import BroadCastProcessor._
override def processElement(value: Int,
ctx: BroadcastProcessFunction[Int, (Int, String), String]#ReadOnlyContext,
out: Collector[String]): Unit = {
val broadcastState = ctx.getBroadcastState(broadcastStateDescriptor)
if (broadcastState.contains(value)) {
out.collect(broadcastState.get(value))
}
}
override def processBroadcastElement(value: (Int, String),
ctx: BroadcastProcessFunction[Int, (Int, String), String]#Context,
out: Collector[String]): Unit = {
ctx.getBroadcastState(broadcastStateDescriptor).put(value._1, value._2)
}
}
object BroadCastProcessor {
val broadcastStateDescriptor: MapStateDescriptor[Int, String] = new MapStateDescriptor[Int, String]("int_to_string", classOf[Int], classOf[String])
}
class CollectSink extends SinkFunction[String] {
import CollectSink._
override def invoke(value: String): Unit = {
values += value
}
}
object CollectSink { // must be static
val values: mutable.MutableList[String] = mutable.MutableList[String]()
}
class BroadCastProcessTest extends FunSuite with BeforeAndAfter {
before {
CollectSink.values.clear()
}
test("add_elem_to_broadcast_and_process_should_apply_broadcast_rule") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataToProcessStream = env.fromElements(1)
val ruleToBroadcastStream = env.fromElements(1 -> "1", 2 -> "2", 3 -> "3")
val broadcastStream = ruleToBroadcastStream.broadcast(BroadCastProcessor.broadcastStateDescriptor)
dataToProcessStream
.connect(broadcastStream)
.process(new BroadCastProcessor)
.addSink(new CollectSink())
// execute
env.execute()
CollectSink.values should contain("1")
}
}
Обновление благодаря Дэвиду Андерсону
Я выбрал буферный раствор.Я определил функцию процесса для синхронизации:
class SynchronizeModelAndEvent(modelNumberToWaitFor: Int) extends CoProcessFunction[Int, (Int, String), Int] {
val eventBuffer: mutable.MutableList[Int] = mutable.MutableList[Int]()
var modelEventsNumber = 0
override def processElement1(value: Int, ctx: CoProcessFunction[Int, (Int, String), Int]#Context, out: Collector[Int]): Unit = {
if (modelEventsNumber < modelNumberToWaitFor) {
eventBuffer += value
return
}
out.collect(value)
}
override def processElement2(value: (Int, String), ctx: CoProcessFunction[Int, (Int, String), Int]#Context, out: Collector[Int]): Unit = {
modelEventsNumber += 1
if (modelEventsNumber >= modelNumberToWaitFor) {
eventBuffer.foreach(event => out.collect(event))
}
}
}
И поэтому мне нужно добавить ее в свой поток:
dataToProcessStream
.connect(ruleToBroadcastStream)
.process(new SynchronizeModelAndEvent(3))
.connect(broadcastStream)
.process(new BroadCastProcessor)
.addSink(new CollectSink())
Спасибо