Dynami c Оценка шаблона с использованием Apache Flink - PullRequest
0 голосов
/ 09 марта 2020

Я новичок ie до Apache Флинк Я пытаюсь провести динамическую c оценку шаблонов в потоке с использованием Flink CEP. Я пытаюсь найти пользователей, которые выполнили следующее действие login, addtocart и logout, и он может обнаружить шаблон, но если я определяю несколько шаблонов, таких как login, выход из системы не может обнаружить шаблон

Ниже приведен мой код

Класс действия

public class Action {

    public int userID;
    public String action;

    public Action() {
    }

    public Action(int userID, String action) {
        this.userID = userID;
        this.action = action;
    }

    public int getUserID() {
        return userID;
    }

    public void setUserID(int userID) {
        this.userID = userID;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    @Override
    public String toString() {
        return "Action [userID=" + userID + ", action=" + action + "]";
    }

}

Класс Pattern

public class Pattern {

    public String firstAction;
    public String secondAction;
    public String thirdAction;

    public Pattern() {

    }

    public Pattern(String firstAction, String secondAction) {
        this.firstAction = firstAction;
        this.secondAction = secondAction;
    }

    public Pattern(String firstAction, String secondAction, String thirdAction) {
        this.firstAction = firstAction;
        this.secondAction = secondAction;
        this.thirdAction = thirdAction;
    }

    public String getFirstAction() {
        return firstAction;
    }

    public void setFirstAction(String firstAction) {
        this.firstAction = firstAction;
    }

    public String getSecondAction() {
        return secondAction;
    }

    public void setSecondAction(String secondAction) {
        this.secondAction = secondAction;
    }

    public String getThirdAction() {
        return thirdAction;
    }

    public void setThirdAction(String thirdAction) {
        this.thirdAction = thirdAction;
    }

    @Override
    public String toString() {
        return "Pattern [firstAction=" + firstAction + ", secondAction=" + secondAction + ", thirdAction=" + thirdAction
                + "]";
    }



}

Основной класс

public class CEPBroadcast {

    public static class PatternEvaluator
            extends KeyedBroadcastProcessFunction<Integer, Action, Pattern, Tuple2<Integer, Pattern>> {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        ValueState<String> prevActionState;

        MapStateDescriptor<Void, Pattern> patternDesc;

        @Override
        public void open(Configuration conf) throws IOException {
            prevActionState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastAction", Types.STRING));
            patternDesc = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
        }

        @Override
        public void processBroadcastElement(Pattern pattern, Context ctx, Collector<Tuple2<Integer, Pattern>> out)
                throws Exception {

            BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
            bcState.put(null, pattern);
            ;

        }

        @Override
        public void processElement(Action action, ReadOnlyContext ctx, Collector<Tuple2<Integer, Pattern>> out)
                throws Exception {
            Pattern pattern = ctx.getBroadcastState(this.patternDesc).get(null);
            String prevAction = prevActionState.value();

            if (pattern != null && prevAction != null) {

                if (pattern.firstAction.equals(prevAction) && pattern.secondAction.equals(prevAction)
                        && pattern.thirdAction.equals(action.action)) {
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
                } else if (pattern.firstAction.equals(prevAction) && pattern.secondAction.equals(action.action)) {
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
                }
            }

            prevActionState.update(action.action);

        }

    }

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Action> actions = env.fromElements(new Action(1001, "login"), new Action(1002, "login"),
                new Action(1003, "login"), new Action(1003, "addtocart"), new Action(1001, "logout"),
                new Action(1003, "logout"));

        DataStream<Pattern> pattern = env.fromElements(new Pattern("login", "logout"));

        KeyedStream<Action, Integer> actionByUser = actions
                .keyBy((KeySelector<Action, Integer>) action -> action.userID);

        MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID,
                Types.POJO(Pattern.class));

        BroadcastStream<Pattern> bcedPattern = pattern.broadcast(bcStateDescriptor);

        DataStream<Tuple2<Integer, Pattern>> matches = actionByUser.connect(bcedPattern)
                .process(new PatternEvaluator());

        matches.flatMap(new FlatMapFunction<Tuple2<Integer, Pattern>, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void flatMap(Tuple2<Integer, Pattern> value, Collector<String> out) throws Exception {

                if (value.f1.thirdAction != null) {
                    out.collect("User ID: " + value.f0 + ",Pattern matched:" + value.f1.firstAction + ","
                            + value.f1.secondAction + "," + value.f1.thirdAction);
                } else {

                    out.collect("User ID: " + value.f0 + ",Pattern matched:" + value.f1.firstAction + ","
                            + value.f1.secondAction);

                }

            }

        }).print();

        env.execute("CEPBroadcast");

    }

}

Если я даю один Pattern для оценки его подачи вывод, как показано ниже

DataStream<Action> actions = env.fromElements(new Action(1001, "login"), new Action(1002, "login"),
                new Action(1003, "login"), new Action(1003, "addtocart"), new Action(1001, "logout"),
                new Action(1003, "logout"));

DataStream<Pattern> pattern = env.fromElements(new Pattern("login", "logout"));

Output: User ID: 1001,Pattern matched:login,logout

Если я пытаюсь дать несколько шаблонов для оценки, как показано ниже, то, что второй шаблон не оценивается, подскажите, как я могу оценить несколько шаблонов, заранее спасибо.

DataStream<Pattern> pattern = env.fromElements(new Pattern ("login","addtocart","logout"),
                new Pattern("login", "logout"));

Output:  User ID: 1003,Pattern matched:login,addtocart,logout

1 Ответ

1 голос
/ 09 марта 2020

Существует несколько причин, по которым это не работает:

(1) Каждый раз, когда у вас есть оператор Flink с несколькими входными потоками, например PatternEvaluator, в вашем приложении вы не можете контролировать как этот оператор будет читать из своих входов. В вашем случае он может полностью потреблять события из потока действий перед чтением шаблонов или наоборот, или он может чередовать два потока. В некотором смысле, вам повезло, что это вообще соответствовало чему-либо.

Решение этой проблемы будет нелегким. Если вы знаете все шаблоны во время компиляции (другими словами, если они на самом деле не являются динамическими c), то вы можете использовать Flink CEP или MATCH_RECOGNIZE из Flink SQL.

Если вы действительно Вам нужны динамические c шаблоны, тогда вам нужно будет найти способ заблокировать поток действий, пока шаблоны не будут прочитаны. Эта топика c («боковые входы») была рассмотрена ранее в других вопросах здесь, на SO. Например, см. Как выполнить модульное тестирование BroadcastProcessFunction в режиме flink, когда processElement зависит от передаваемых данных . (Или вы можете скорректировать свои ожидания и убедиться, что только действия, обработанные после сохранения шаблона, могут быть сопоставлены с этим шаблоном.)

(2) Используя null в качестве ключа при сохранении шаблонов с помощью

bcState.put(null, pattern);

вы перезаписываете первый шаблон вторым, когда он прибывает. Никогда не бывает, чтобы оба шаблона были доступны для сопоставления.

Чтобы сопоставить ввод с двумя разными шаблонами, вам нужно изменить PatternEvaluator для обработки одновременного сопоставления для обоих шаблонов. Это повлечет за собой сохранение обоих шаблонов в состоянии широковещания, учитывая оба из них в processElement и наличие экземпляров prevActionState для обоих шаблонов. Возможно, вы захотите дать идентификаторы шаблонов, использовать эти идентификаторы в качестве ключей в состоянии широковещания и использовать MapState для prevActionState, еще раз с указанием идентификатора шаблона.

Обновление:

Помните, что когда вы используете API DataStream для написания потокового задания, вы не определяете последовательность выполнения, как это было бы в типичном процедурном приложении. Вместо этого вы описываете топологию графа потока данных и поведение операторов, встроенных в этот граф, которые будут выполнять задание (которое будет выполняться параллельно).

...