События повторяются автоматически, даже если в источнике события не указаны: Flink Cep - PullRequest
0 голосов
/ 10 июля 2020

Я пытался детально изучить flink cep, однако попал в неприятность, которая кажется очень странной. Я загружал определенные данные о событиях в kafka topi c и получал сообщения от этого topi c и обнаруживал закономерности с помощью flink cep. Опять же, получая сообщения от kafka, я выводил их на стандартный вывод. Однако, как ни странно, даже несмотря на то, что я не вставляю никаких сообщений в kafka topi c, старые сообщения продолжают выводиться на стандартный вывод. Ниже мой код -:

package com.anishIND97
import java.util.Properties

import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.api.scala._
import org.apache.flink.cep.pattern.Quantifier.Times
import org.apache.flink.cep.pattern.conditions.IterativeCondition
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
case class Scheme(ID:String,value:Int)
case class warning(ID:String)

object cepExploration {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.enableCheckpointing(3000) // checkpoint every 3000 msec
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //by default
    //val executionConfig = new ExecutionConfig
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    // only required for Kafka 0.8
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    properties.setProperty("enable.auto.commit", "true")
    val myConsumer = new FlinkKafkaConsumer("kafka_flink", new SimpleStringSchema(), properties)
    //myConsumer.setStartFromEarliest()      // start from the earliest record possible
    myConsumer.setStartFromLatest()
    val lines = env.addSource(myConsumer)
    val data = lines.map(value => {
      val sep = value.split(",")
      Scheme(sep(0), sep(1).toInt)
    })
    val pattern1 = Pattern.begin[Scheme]("start").where(v=>{ v.ID=="45"}).times(2).within(Time.seconds(10))
    val patternStream= CEP.pattern(data,pattern1)
    val warn=patternStream.select(v=>{
      (v.get("start").get.head,v.get("second").get)
    })
    data.print("Data:")
    warn.print("warning:")
    env.execute("Exploration")
  }
}

ввод -:

>45,3
>34,2
>45,2
>45,2
>45,2
>45,2

вывод -:

Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(45,3)
Data:> Scheme(45,3)
Data:> Scheme(45,3)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
...