Невозможно применить шаблон по ключу в KeyedStream с помощью Flink CEP - PullRequest
0 голосов
/ 02 марта 2020

Я пытаюсь отправить предупреждение для каждых 2 событий с одним и тем же идентификатором. Для этого я использую keyBy(), чтобы применить свои операции в соответствии с идентификатором.

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.{TimeCharacteristic, scala}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._
import org.apache.flink.cep.CEP
import org.apache.flink.cep.pattern.Pattern
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.datastream.{DataStream, KeyedStream}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

class Pipeline {

    //{...}

    var env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    //{...}

    var stream : DataStream[String] = env.readTextFile("/home/luca/Desktop/input").name("Stream original")

    var tupleStream = stream.map(new S2TMapFunction())
    var newTupleStream : DataStream[(String,Double,Double,String,Int,Int)] = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner())
    newTupleStream = newTupleStream.process(new RemoveLateDataProcessFunction).keyBy(new TupleKeySelector())

    Pattern.begin[(String,Double,Double,String,Int,Int)]("teste").where(new EventoTesteConditionFunction(2)).times(1)

    val patternStream = CEP.pattern(newTupleStream,pattern)

    val result = patternStream.process(new EventoTestePatternProcessFunction())

    result.writeAsText("/home/luca/Desktop/output",FileSystem.WriteMode.OVERWRITE)

    env.execute()

}

Вот мой TupleKeySelector:

import org.apache.flink.api.java.functions.KeySelector

class TupleKeySelector() extends KeySelector[(String,Double,Double,String,Int,Int),Int]{

    override def getKey(value: (String, Double, Double, String, Int, Int)): Int = {

        value._6

    }
}

Вот мой PlacasPunctualTimestampAssigner :

import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.watermark.Watermark

class PlacasPunctualTimestampAssigner extends AssignerWithPunctuatedWatermarks[(String,Double,Double,String,Int,Int)]{

    var counter : Int = 0
    var eventsUntilNextWatermark : Int = 0
    var lastTimestamp : Long = _

    override def checkAndGetNextWatermark(lastElement: (String, Double, Double, String, Int, Int), extractedTimestamp: Long): Watermark = {

        if(counter == eventsUntilNextWatermark){

            counter = 0

            var time = new Timestamp(lastTimestamp)
            println("Watermark: ",time.toString)
            new Watermark(lastTimestamp)

        }else{

            null

        }

    }

    override def extractTimestamp(element: (String,Double, Double,String,Int,Int), previousElementTimestamp: Long): Long = {


        val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        val date = formatter.parse(element._4)
        val timestampAtual = new Timestamp(date.getTime).getTime
        lastTimestamp = Math.max(lastTimestamp,timestampAtual)

        //counter = counter + 1

        timestampAtual

    }
}

Вот мой EventoTesteConditionFunction:

import org.apache.flink.cep.pattern.conditions.IterativeCondition

class EventoTesteConditionFunction(var counter : Int) extends IterativeCondition[(String,Double,Double,String,Int,Int)] {

    private var c  : Int = 0

    override def filter(value: (String, Double, Double, String, Int, Int), ctx: IterativeCondition.Context[(String, Double, Double, String, Int, Int)]): Boolean = {

        if(c == counter-1){

            println("Event: "+value._6.toString)

            c = 0

            true

        }else{

            c = c + 1

            false

        }

    }
}

В соответствии с семантикой, принятой Flink, я должен достичь своей цели, используя квантификатор times(1) с моим шаблоном, или даже пропуская это. Тем не менее, это не то, что я получил в своем выводе. Вот мой тестовый пример:

//Input events:

(name,timestamp,id) 
(a,2013-02-08 00:14:00,1)
(b,2013-02-08 00:15:00,1) // Event should be emitted here
(c,2013-02-08 00:15:10,1)
(d,2013-02-08 00:17:15,1) // Event should be emitted here
(e,2013-02-08 00:18:20,1)
(f,2013-02-08 00:19:30,0)
(g,2013-02-08 00:20:35.507,2)
(h,2013-02-08 00:21:39.906,2) // Event should be emitted here
(i,2013-02-08 00:22:03.499,2)
(j,2013-02-08 00:23:23.818,2) // Event should be emitted here
(k,2013-02-08 00:24:02.319,2)
(l,2013-02-08 00:25:30.971,0)

//Expected output(I'm returning only the id value):

1
1
2
2

//Real output:

1
1
0
2
2
0

Этот тест показывает, что шаблон не применяется ключом, а вместо этого видит все события как принадлежащие одной и той же группе, что является поведением без ключа DataStream.

Если установить квантификатор на times(2), это то, что я получаю с того же входа:

//Expected output:

1
2

//Real output:

0
0

Я также получаю странные отпечатки с моего EventoTesteConditionFunction. Похоже, что некоторые события проверяются более одного раза, а другие пропускаются.

//println from accepted events ofEventoTesteConditionFunction for times(2) quantifier

Event: (b,1)
Event: (c,1)
Event: (d,1)
Event: (e,1)
Event: (f,0)
Event: (h,2)
Event: (i,2)
Event: (j,2)
Event: (k,2)
Event: (l,0)
Event: (l,0)

Учитывая все это, у меня следующие вопросы:

  • Как я могу применить шаблон с помощью ключи?
  • Почему Flink демонстрирует такое странное поведение?

1 Ответ

0 голосов
/ 03 марта 2020

Шаблон, который вы используете:

Pattern.begin[(String,Double,Double,String,Int,Int)]("teste").where(new EventoTesteConditionFunction(2)).times(1)

Говорит, что вы хотите создать каждую запись, соответствующую вашей функции EventoTesteConditionFunction. В вашем случае вы должны использовать шаблон:

Pattern.begin[(String,Double,Double,String,Int,Int)]("teste").where(new AlwaysTrue()).times(2)

, где AlwaysTrue возвращает true для всех записей.

Причина, по которой вы видите странное поведение, заключается в том, как вы используете переменная c in EventoTesteConditionFunction. Эта переменная не ограничена ключом. Flink будет использовать один экземпляр EventoTesteConditionFunction для одной Задачи, которая обрабатывает подгруппу ключей, а не один ключ. Тем не менее результаты вычислений находятся в области видимости. Внутреннее состояние, определяемое результатом функции, также ограничивается клавишей.

На другой ноте. Для такого случая использования я бы порекомендовал изучить ProcessFunction . В документации вы также можете увидеть похожий пример того, что вы пытаетесь сделать, но с использованием состояния Флинка, которое будет проверено и также привязано к клавише.

...