Как flink sql работает для правил CEP с EventTime? - PullRequest
0 голосов
/ 13 июня 2019

Мы создали поток данных для kafka и используем flink sql, чтобы отловить сценарий для правила eventA после eventB. Теперь я использовал sql для proctime, правило работает нормально. Но я использовал время события, оно не может работать.

Я пытался использовать водяной знак, назначенный потребителю кафки.

ниже указан код отправителя

public static JsonObject generateEvent() throws ParseException {
        String[] nameStrings = new String[] { "A", "B", "C", "D" };
        Integer[] countArrays = new Integer[] { 50, 100, 150, 200 };
        Long baseTime2 = (long)1557720000;
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("name", getRandomElement(nameStrings).toString());
        jsonObject.addProperty("amount", getRandomElement(countArrays).toString());
        jsonObject.addProperty("rowtime", randomDate(baseTime2));
        jsonObject.addProperty("correlationID", random.nextInt(20));
        return jsonObject;
    }

ниже - квитанция sql:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
        configConsumer();
        consumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SQLEvent>(Time.milliseconds(10)) {
            private static final long serialVersionUID = 1L;
            @Override
            public long extractTimestamp(SQLEvent element) {
                return element.getRowtime();
            }
        });

        DataStream<SQLEvent> dataStream = env.addSource(consumer);
        dataStream.print();
        tEnv.registerDataStream("Events", dataStream, "name, amount, rowtime.rowtime, correlationID");
//      tEnv.registerDataStream("Events", dataStream, "name, amount, proctime.proctime, correlationID");
        String sqlQuery = "SELECT name,rowtime FROM Events "
                + "MATCH_RECOGNIZE "
//              + "(ORDER BY proctime "
                + "(ORDER BY rowtime "
                + "MEASURES e1.name as name,rowtime as rowtime "
                + "AFTER MATCH SKIP PAST LAST ROW "
                + "PATTERN (e1 e2) "
                + "DEFINE e1 as e1.name = 'A',e2 as e2.name = 'B'"
                + ")";

Я хочу, чтобы eventtime работал как proctime.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...