Я пытаюсь разработать простой процесс мониторинга с помощью библиотеки CEP Flink. Я создаю csv
файл от Кафки и использую его с Flink, и это будет Tuple4.
Пока здесь все отлично работает. Потребитель Flink может отлично использовать данные из Kafka и конвертировать их в Tuple4. Затем я создаю Pattern, PatternStream и, наконец, функцию select () для объекта Datastream.
Видимо, все в порядке, но это не работает. Что-то не так с функцией select ().
Я также создаю POJO для получения окончательных значений. Я добавляю оба кода. Спасибо.
public class LTMonitoringTest {
private static final int LEUKO = 3500;
public static void main(final String[] ARGS) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(1000);
ParameterTool parameterTool = ParameterTool.fromArgs(ARGS);
DataStreamSource<String> myConsumer = env.addSource(new FlinkKafkaConsumer082<>
(parameterTool.getRequired("topic"), new SimpleStringSchema(),
parameterTool.getProperties()));
DataStream<Tuple4<Integer, Integer, Integer, Long>> streamTuple = myConsumer
.map(new csv2Tuple())
.assignTimestampsAndWatermarks(new ExtractTimestamp());
streamTuple.print();
Pattern<Tuple4<Integer, Integer, Integer, Long>, ?> warningPattern =
Pattern.<Tuple4<Integer, Integer, Integer, Long>>
begin("first")
.where(new IterativeCondition<Tuple4<Integer, Integer, Integer, Long>>() {
@Override
public boolean filter(Tuple4<Integer, Integer, Integer, Long> value,
Context<Tuple4<Integer, Integer, Integer, Long>> ctx)
throws Exception {
return value.f1 <= LEUKO;
}
});
PatternStream<Tuple4<Integer, Integer, Integer, Long>> tempPatternStream = CEP.pattern(
streamTuple.keyBy(0),
warningPattern);
DataStream<Warning> warnings1 = tempPatternStream.select(
(Map<String, List<Tuple4<Integer, Integer, Integer, Long>>> pattern) -> {
Tuple4<Integer, Integer, Integer, Long> first =
(Tuple4<Integer, Integer, Integer, Long>) pattern.get("first").get(0);
return new Warning(first.f0, first.f1);
}
);
warnings1.print();
env.execute("CEP monitoring job");
}
private static class ExtractTimestamp extends
AscendingTimestampExtractor<Tuple4<Integer, Integer, Integer, Long>> {
@Override
public long extractAscendingTimestamp(Tuple4<Integer, Integer, Integer, Long> element) {
return element.f3;
}
}
public static class csv2Tuple implements MapFunction<String,
Tuple4<Integer, Integer, Integer, Long>> {
@Override
public Tuple4<Integer, Integer, Integer, Long> map(String str) {
String[] temp = str.split(",");
return new Tuple4<>(
Integer.parseInt(temp[0]),
Integer.parseInt(temp[1]),
Integer.parseInt(temp[2]),
Long.parseLong(temp[3]));
}
}
}
Получает в качестве аргументов (для kafka konsumer):
- тема lt-мониторинга --bootstrap.servers localhost: 9092 --zookeeper.connect localhost: 2181 --group.id myGroup
И POJO:
public class Warning {
private int ID;
private int Level;
public Warning(int ID, int Level) {
this.ID = ID;
this.Level = Level;
}
/**
* Sets new ID.
*
* @param ID New value of ID.
*/
public void setID(int ID) {
this.ID = ID;
}
/**
* Gets ID.
*
* @return Value of ID.
*/
public int getID() {
return ID;
}
/**
* Gets Level.
*
* @return Value of Level.
*/
public int getLevel() {
return Level;
}
/**
* Sets new Level.
*
* @param Level New value of Level.
*/
public void setLevel(int Level) {
this.Level = Level;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Warning) {
Warning other = (Warning) obj;
return ID == other.ID && Level == other.Level;
} else {
return false;
}
}
@Override
public int hashCode() {
return 41 * ID + Double.hashCode(Level);
}
@Override
public String toString() {
return "Warning{" +
"ID=" + ID + getID() +
", Level=" + Level +
'}';
}
* *} Тысяча двадцать-один
Так что, пожалуйста, подумайте, почему он не работает?
Кстати, это ошибка:
Исключительная ситуация в потоке "main" ; II [I [ILorg / Apache / Flink / API / общий / TypeInfo / TypeInformation; Ljava / языки / Строка, Z) Lorg / Apache / Flink / API / общий / TypeInfo / TypeInformation;
в org.apache.flink.cep.PatternStream.select (PatternStream.java:96)
на LTMonitoringTest.main (LTMonitoringTest.java:78)