Flink CEP не работает во время события, но работает во время обработки - PullRequest
0 голосов
/ 21 февраля 2020

Когда я использую код Flink CEP для времени обработки (которое является конфигурацией по умолчанию), я могу получить требуемое совпадение скороговорки, но при настройке env для Event Time я не могу получить сопоставление с любым шаблоном.

 def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.enableCheckpointing(3000) // checkpoint every 3000 msec
     val lines = env.addSource(consumerKafkaSource.consume("bank_transaction_2", "192.168.2.201:9092", "192.168.2.201:2181", "http://192.168.2.201:8081"))

  val eventdate = ExtractAndAssignEventTime.assign(lines, "unix", "datetime", 3) //Extracting date time here

    val event = eventdate.keyBy(v => v.get("customer_id").toString.toInt)
   val pattern1 = Pattern.begin[GenericRecord]("start").where(v=>v.get("state").toString=="FAILED").next("d").where(v=>v.get("state").toString=="FAILED")
      val patternStream = CEP.pattern(event, pattern1)
    val warnID = patternStream.sideOutputLateData(latedata).select(value =>  {
      val v = value.mapValues(c => c.toList.toString)
      Json(DefaultFormats).write(v).replace("\\\"", "\"")
        //.replace("List(","{").replace(")","}")
    })
    val latedatastream = warnID.getSideOutput(latedata)
    latedatastream.print("late_data")


    warnID.print("warning")
    event.print("event")

Код извлечения метки времени

object ExtractAndAssignEventTime {
  def assign(stream:DataStream[GenericRecord],timeFormat:String,timeColumn:String,OutofOrderTime:Int ):DataStream[GenericRecord] ={
    if(!(timeFormat.equalsIgnoreCase("Unix"))){
      val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(3)) {
        override def extractTimestamp(t: GenericRecord): Long = {
          new java.text.SimpleDateFormat(timeFormat).parse(t.get(timeColumn).toString).getTime
        }
      })
      EventTimeStream
    }
    else{
      val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(OutofOrderTime)) {
        override def extractTimestamp(t: GenericRecord): Long = {
          (t.get(timeColumn).toString.toLong)
        }
      })
      EventTimeStream
    }
  }

Пожалуйста, помогите мне решить эту проблему. Заранее спасибо.!

Ответы [ 2 ]

0 голосов
/ 21 февраля 2020

У меня была такая же проблема, и я «решил» ее только сейчас, но ответ, как вы увидите, не имеет особого смысла (по крайней мере, для меня).

Объяснение:

В моем исходном коде у меня было это:

var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
env.getConfig.setAutoWatermarkInterval(1)

...

var stream : DataStream[String] = env.readTextFile("/home/luca/Desktop/input")


var tupleStream = stream.map(new S2TMapFunction())
tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner())

val pattern = Pattern.begin[(String,Double,Double,String,Int,Int)]("follow").where(new SameRegionFunction())

val patternStream = CEP.pattern(newTupleStream,pattern)

val result = patternStream.process(new MyPatternProcessFunction())

В соответствии с моей регистрацией, я увидел, что ни SameRegionFunction, ни MyPatternProcessFunction не выполнялись, что является очень неожиданным, если не сказать больше .

Ответ:

Поскольку я был не в курсе, я решил проверить создание своего потока go с помощью еще одной функции преобразования, просто чтобы проверить, действительно ли мои события вставляются в поток. Итак, я отправил tupleStream в операцию карты, сгенерировав newTupleStream, например:

var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
env.getConfig.setAutoWatermarkInterval(1)

...

var stream : DataStream[String] = env.readTextFile("/home/luca/Desktop/input")


/* I created 'DoNothingMapFunction', where the output event = input event*/
var tupleStream = stream.map(new S2TMapFunction())
var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner()).map(new DoNothingMapFunction())


val pattern = Pattern.begin[(String,Double,Double,String,Int,Int)]("follow").where(new SameRegionFunction())

val patternStream = CEP.pattern(newTupleStream,pattern)

val result = patternStream.process(new MyPatternProcessFunction())

А потом SameRegionFunction и MyPatternProcessFunction решили выполнить.

Obs:

Я изменил строку:

var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner()).map(new DoNothingMapFunction())

на эту:

var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner())

, и это также сработало. Видимо, достаточно просто еще одного уровня косвенности, чтобы заставить его работать, хотя мне не ясно, почему это происходит.

0 голосов
/ 21 февраля 2020

Поскольку вы используете AssingerWithPeriodicWatermark Вам также необходимо настроить setAutowatermarkInterval, чтобы Flink использовал этот интервал для создания водяных знаков.

Вы можете сделать это, позвонив по номеру env.getConfig.setAutoWatermarkInterval([interval]).

Для времени события CEP основывается на водяных знаках, поэтому, если они не генерируются, то в основном не будет выходных данных.

...