У меня была такая же проблема, и я «решил» ее только сейчас, но ответ, как вы увидите, не имеет особого смысла (по крайней мере, для меня).
Объяснение:
В моем исходном коде у меня было это:
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
env.getConfig.setAutoWatermarkInterval(1)
...
var stream : DataStream[String] = env.readTextFile("/home/luca/Desktop/input")
var tupleStream = stream.map(new S2TMapFunction())
tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner())
val pattern = Pattern.begin[(String,Double,Double,String,Int,Int)]("follow").where(new SameRegionFunction())
val patternStream = CEP.pattern(newTupleStream,pattern)
val result = patternStream.process(new MyPatternProcessFunction())
В соответствии с моей регистрацией, я увидел, что ни SameRegionFunction
, ни MyPatternProcessFunction
не выполнялись, что является очень неожиданным, если не сказать больше .
Ответ:
Поскольку я был не в курсе, я решил проверить создание своего потока go с помощью еще одной функции преобразования, просто чтобы проверить, действительно ли мои события вставляются в поток. Итак, я отправил tupleStream
в операцию карты, сгенерировав newTupleStream
, например:
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
env.getConfig.setAutoWatermarkInterval(1)
...
var stream : DataStream[String] = env.readTextFile("/home/luca/Desktop/input")
/* I created 'DoNothingMapFunction', where the output event = input event*/
var tupleStream = stream.map(new S2TMapFunction())
var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner()).map(new DoNothingMapFunction())
val pattern = Pattern.begin[(String,Double,Double,String,Int,Int)]("follow").where(new SameRegionFunction())
val patternStream = CEP.pattern(newTupleStream,pattern)
val result = patternStream.process(new MyPatternProcessFunction())
А потом SameRegionFunction
и MyPatternProcessFunction
решили выполнить.
Obs:
Я изменил строку:
var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner()).map(new DoNothingMapFunction())
на эту:
var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner())
, и это также сработало. Видимо, достаточно просто еще одного уровня косвенности, чтобы заставить его работать, хотя мне не ясно, почему это происходит.