В целях тестирования я использую следующий пользовательский источник:
class ThrottledSource[T](
data: Array[T],
throttling: Int,
beginWaitingTime: Int = 0,
endWaitingTime: Int = 0
) extends SourceFunction[T] {
private var isRunning = true
private var offset = 0
override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
Thread.sleep(beginWaitingTime)
val lock = ctx.getCheckpointLock
while (isRunning && offset < data.length) {
lock.synchronized {
ctx.collect(data(offset))
offset += 1
}
Thread.sleep(throttling)
}
Thread.sleep(endWaitingTime)
}
override def cancel(): Unit = isRunning = false
и использую его следующим образом в своем тесте
val controlStream = new ThrottledSource[Control](
data = Array(c1,c2), endWaitingTime = 10000, throttling = 0,
)
val dataStream = new ThrottledSource[Event](
data = Array(e1,e2,e3,e4,e5),
throttling = 1000,
beginWaitingTime = 2000,
endWaitingTime = 2000,
)
val dataStream = env.addSource(events)
env.addSource(controlStream)
.connect(dataStream)
.process(MyProcessFunction)
Я хочу получить все элементы управления во-первых (поэтому я не указываю ни beginWaitingTime
, ни throttling
). В processElement1
и processElement2
в MyProcessFunction я печатаю элементы, когда получаю их. В большинстве случаев сначала я получаю два элемента управления, как и ожидалось, но для меня, как это ни удивительно, время от времени я сначала получаю элементы данных, несмотря на двухсекундную задержку, которая используется источником данных для начала излучения его элементов. Кто-нибудь может мне это объяснить?