Мы создали поток данных для kafka и используем flink sql, чтобы отловить сценарий для правила eventA после eventB.
Теперь я использовал sql для proctime, правило работает нормально. Но я использовал время события, оно не может работать.
Я пытался использовать водяной знак, назначенный потребителю кафки.
ниже указан код отправителя
public static JsonObject generateEvent() throws ParseException {
String[] nameStrings = new String[] { "A", "B", "C", "D" };
Integer[] countArrays = new Integer[] { 50, 100, 150, 200 };
Long baseTime2 = (long)1557720000;
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("name", getRandomElement(nameStrings).toString());
jsonObject.addProperty("amount", getRandomElement(countArrays).toString());
jsonObject.addProperty("rowtime", randomDate(baseTime2));
jsonObject.addProperty("correlationID", random.nextInt(20));
return jsonObject;
}
ниже - квитанция sql:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
configConsumer();
consumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SQLEvent>(Time.milliseconds(10)) {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(SQLEvent element) {
return element.getRowtime();
}
});
DataStream<SQLEvent> dataStream = env.addSource(consumer);
dataStream.print();
tEnv.registerDataStream("Events", dataStream, "name, amount, rowtime.rowtime, correlationID");
// tEnv.registerDataStream("Events", dataStream, "name, amount, proctime.proctime, correlationID");
String sqlQuery = "SELECT name,rowtime FROM Events "
+ "MATCH_RECOGNIZE "
// + "(ORDER BY proctime "
+ "(ORDER BY rowtime "
+ "MEASURES e1.name as name,rowtime as rowtime "
+ "AFTER MATCH SKIP PAST LAST ROW "
+ "PATTERN (e1 e2) "
+ "DEFINE e1 as e1.name = 'A',e2 as e2.name = 'B'"
+ ")";
Я хочу, чтобы eventtime работал как proctime.