Несколько Apache Flink windows проверок - PullRequest
5 голосов
/ 05 февраля 2020

Я только начинаю обработку потока, используя Apache Flink, дело в том, что я получаю поток Json, который выглядит следующим образом:

{

  token_id: “tok_afgtryuo”,

  ip_address: “128.123.45.1“,

  device_fingerprint: “abcghift”,

  card_hash: “hgtyuigash”,

  “bin_number”: “424242”,

  “last4”: “4242”,

  “name”: “Seu Jorge”

}

И спросили, я мог бы выполнить следующие бизнес-правила:

  • Отклонить, если количество токенов> 5 для этого IP за последние 10 секунд

  • Отклонить, если число токены> 15 для этого IP в последнюю минуту

  • Отклонить, если количество токенов> 60 для этого IP в последний час

Я сделал 2 класса , main класс, когда я создаю экземпляр для вызова функции Window с различными параметрами, чтобы избежать дублирования кода:

Main. java

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //This DataStream Would be  Converting the Json to a Token Object
        DataStream<Token> baseStream =
                env.addSource(new SocketTextStreamFunction("localhost",
                        9999,
                        "\n",
                        1))
                        .map(new MapTokens());


        // 1- First rule Decline if number of tokens > 5 for this IP in last 10 seconds
       DataStreamSink<String> response1 =  new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.seconds(10),
               5, "seconds").print();

        //2 -Decline if number of tokens > 15 for this IP in last minute
        DataStreamSink<String> response2 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.minutes(1),
                62, "minutes").print();

        //3- Decline if number of tokens > 60 for this IP in last hour
        DataStreamSink<String> response3  = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.hours(1),
                60, "Hours").print();

        env.execute("Job2");
    }

И другой класс где я делаю все логи c для правил, я подсчитываю время, когда появляется IP-адрес, и, если он больше разрешенного числа во временном окне, я возвращаю сообщение с некоторой информацией:

Rulemaker. java

public class RuleMaker {


    public DataStream<String> getStreamKeyCount(DataStream<Token> stream, 
                                                String tokenProp,
                                                Time time, 
                                                Integer maxPetitions, 
                                                String ruleType){

        return
               stream
                .flatMap(new FlatMapFunction<Token, Tuple3<String, Integer, String>>() {
                    @Override
                    public void flatMap(Token token, Collector<Tuple3<String, Integer, String>> collector) throws Exception {

                         String tokenSelection = "";
                        switch (tokenProp)
                        {
                            case "ip":
                                tokenSelection = token.getIpAddress();
                                break;
                            case "device":
                                tokenSelection = token.getDeviceFingerprint();
                                break;
                            case "cardHash":
                                tokenSelection = token.getCardHash();
                                break;
                        }
                        collector.collect(new Tuple3<>(tokenSelection, 1, token.get_tokenId()));
                    }
                })
                .keyBy(0)
                .timeWindow(time)
                .process(new MyProcessWindowFunction(maxPetitions, ruleType));
    }

    //Class to process the elements from the window
    private class MyProcessWindowFunction extends ProcessWindowFunction<
            Tuple3<String, Integer, String>,
            String,
            Tuple,
            TimeWindow
            > {

        private Integer _maxPetitions;
        private String  _ruleType;


        public MyProcessWindowFunction(Integer maxPetitions, String ruleType) {
            this._maxPetitions = maxPetitions;
            this._ruleType = ruleType;
        }

        @Override
        public void process(Tuple tuple, Context context, Iterable<Tuple3<String, Integer, String>> iterable, Collector<String> out) throws Exception {

            Integer counter = 0;
            for (Tuple3<String, Integer, String> element : iterable) {
                counter += element.f1++;
                if(counter > _maxPetitions){
                    out.collect("El elemeto ha sido declinado: " + element.f2 + " Num elements: " + counter + " rule type: " +  _ruleType + " token: " + element.f0 );
                    counter = 0;
                }
            }
        }
    }
}

Пока, я думаю, этот код работает, но я Я начинаю с Apache Flink, и я буду очень признателен, если вы скажете мне, если что-то не так с тем, как я пытаюсь работать с этим, и укажите мне правильное направление.

Большое спасибо.

1 Ответ

3 голосов
/ 07 февраля 2020

Общий подход выглядит очень хорошо, хотя я бы подумал, что Table API будет достаточно мощным, чтобы помочь вам (более кратко), который поддерживает Json из коробки.

Если вы хотите придерживаться API DataStream, в getStreamKeyCount следует заменить переключатель вокруг tokenProp передачей ключевого экстрактора в getStreamKeyCount, чтобы иметь только одно место для добавления новых правил.

public DataStream<String> getStreamKeyCount(DataStream<Token> stream, 
                                            KeySelector<Token, String> keyExtractor,
                                            Time time, 
                                            Integer maxPetitions, 
                                            String ruleType){

    return stream
         .map(token -> new Tuple3<>(keyExtractor.getKey(token), 1, token.get_tokenId()))
            .keyBy(0)
            .timeWindow(time)
            .process(new MyProcessWindowFunction(maxPetitions, ruleType));
}

Тогда вызов становится

DataStreamSink<String> response2 = ruleMaker.getStreamKeyCount(baseStream, 
    Token::getIpAddress, Time.minutes(1), 62, "minutes");
...