Я только начинаю обработку потока, используя Apache Flink, дело в том, что я получаю поток Json, который выглядит следующим образом:
{
token_id: “tok_afgtryuo”,
ip_address: “128.123.45.1“,
device_fingerprint: “abcghift”,
card_hash: “hgtyuigash”,
“bin_number”: “424242”,
“last4”: “4242”,
“name”: “Seu Jorge”
}
И спросили, я мог бы выполнить следующие бизнес-правила:
Отклонить, если количество токенов> 5 для этого IP за последние 10 секунд
Отклонить, если число токены> 15 для этого IP в последнюю минуту
Отклонить, если количество токенов> 60 для этого IP в последний час
Я сделал 2 класса , main
класс, когда я создаю экземпляр для вызова функции Window
с различными параметрами, чтобы избежать дублирования кода:
Main. java
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//This DataStream Would be Converting the Json to a Token Object
DataStream<Token> baseStream =
env.addSource(new SocketTextStreamFunction("localhost",
9999,
"\n",
1))
.map(new MapTokens());
// 1- First rule Decline if number of tokens > 5 for this IP in last 10 seconds
DataStreamSink<String> response1 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.seconds(10),
5, "seconds").print();
//2 -Decline if number of tokens > 15 for this IP in last minute
DataStreamSink<String> response2 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.minutes(1),
62, "minutes").print();
//3- Decline if number of tokens > 60 for this IP in last hour
DataStreamSink<String> response3 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.hours(1),
60, "Hours").print();
env.execute("Job2");
}
И другой класс где я делаю все логи c для правил, я подсчитываю время, когда появляется IP-адрес, и, если он больше разрешенного числа во временном окне, я возвращаю сообщение с некоторой информацией:
Rulemaker. java
public class RuleMaker {
public DataStream<String> getStreamKeyCount(DataStream<Token> stream,
String tokenProp,
Time time,
Integer maxPetitions,
String ruleType){
return
stream
.flatMap(new FlatMapFunction<Token, Tuple3<String, Integer, String>>() {
@Override
public void flatMap(Token token, Collector<Tuple3<String, Integer, String>> collector) throws Exception {
String tokenSelection = "";
switch (tokenProp)
{
case "ip":
tokenSelection = token.getIpAddress();
break;
case "device":
tokenSelection = token.getDeviceFingerprint();
break;
case "cardHash":
tokenSelection = token.getCardHash();
break;
}
collector.collect(new Tuple3<>(tokenSelection, 1, token.get_tokenId()));
}
})
.keyBy(0)
.timeWindow(time)
.process(new MyProcessWindowFunction(maxPetitions, ruleType));
}
//Class to process the elements from the window
private class MyProcessWindowFunction extends ProcessWindowFunction<
Tuple3<String, Integer, String>,
String,
Tuple,
TimeWindow
> {
private Integer _maxPetitions;
private String _ruleType;
public MyProcessWindowFunction(Integer maxPetitions, String ruleType) {
this._maxPetitions = maxPetitions;
this._ruleType = ruleType;
}
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple3<String, Integer, String>> iterable, Collector<String> out) throws Exception {
Integer counter = 0;
for (Tuple3<String, Integer, String> element : iterable) {
counter += element.f1++;
if(counter > _maxPetitions){
out.collect("El elemeto ha sido declinado: " + element.f2 + " Num elements: " + counter + " rule type: " + _ruleType + " token: " + element.f0 );
counter = 0;
}
}
}
}
}
Пока, я думаю, этот код работает, но я Я начинаю с Apache Flink, и я буду очень признателен, если вы скажете мне, если что-то не так с тем, как я пытаюсь работать с этим, и укажите мне правильное направление.
Большое спасибо.