Я пытался детально изучить flink cep, однако попал в неприятность, которая кажется очень странной. Я загружал определенные данные о событиях в kafka topi c и получал сообщения от этого topi c и обнаруживал закономерности с помощью flink cep. Опять же, получая сообщения от kafka, я выводил их на стандартный вывод. Однако, как ни странно, даже несмотря на то, что я не вставляю никаких сообщений в kafka topi c, старые сообщения продолжают выводиться на стандартный вывод. Ниже мой код -:
package com.anishIND97
import java.util.Properties
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.api.scala._
import org.apache.flink.cep.pattern.Quantifier.Times
import org.apache.flink.cep.pattern.conditions.IterativeCondition
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
case class Scheme(ID:String,value:Int)
case class warning(ID:String)
object cepExploration {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.enableCheckpointing(3000) // checkpoint every 3000 msec
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //by default
//val executionConfig = new ExecutionConfig
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
properties.setProperty("enable.auto.commit", "true")
val myConsumer = new FlinkKafkaConsumer("kafka_flink", new SimpleStringSchema(), properties)
//myConsumer.setStartFromEarliest() // start from the earliest record possible
myConsumer.setStartFromLatest()
val lines = env.addSource(myConsumer)
val data = lines.map(value => {
val sep = value.split(",")
Scheme(sep(0), sep(1).toInt)
})
val pattern1 = Pattern.begin[Scheme]("start").where(v=>{ v.ID=="45"}).times(2).within(Time.seconds(10))
val patternStream= CEP.pattern(data,pattern1)
val warn=patternStream.select(v=>{
(v.get("start").get.head,v.get("second").get)
})
data.print("Data:")
warn.print("warning:")
env.execute("Exploration")
}
}
ввод -:
>45,3
>34,2
>45,2
>45,2
>45,2
>45,2
вывод -:
Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(45,3)
Data:> Scheme(45,3)
Data:> Scheme(45,3)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,3)
Data:> Scheme(34,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)
Data:> Scheme(45,2)