Добавление новых правил во время выполнения в apache kafka flink - PullRequest
0 голосов
/ 26 декабря 2018

Я использую FlinkKafka для применения правил в потоке.Ниже приведен пример кода:

ObjectMapper mapper = new ObjectMapper();
List<JsonNode> rulesList = null;
try {
    // Read rule file
    rulesList = mapper.readValue(new File("ruleFile"), new TypeReference<List<JsonNode>>(){});

} catch (IOException e1) {
    System.out.println( "Error reading Rules file.");
    System.exit(-1);
}


for (JsonNode jsonObject : rulesList) {
    String id = (String) jsonObject.get("Id1").textValue();

    // Form the pattern dynamically
    Pattern<JsonNode, ?> pattern = null;
    pattern = Pattern.<JsonNode>begin("start").where(new SimpleConditionImpl(jsonObject.get("rule1")));
    // Create the pattern stream
    PatternStream<JsonNode> patternStream = CEP.pattern(data, pattern);

}

Но проблема в том, что FlinkKafka читает файл только один раз, когда мы запускаем программу, и я хочу, чтобы новые правила динамически добавлялись во время выполнения и применялись к потоку..

Есть ли способ достичь этого во Флинк Кафке?

1 Ответ

0 голосов
/ 26 декабря 2018

Библиотека CEP Флинка (пока) не поддерживает динамические шаблоны.(См. FLINK-7129 .)

Стандартный подход для этого заключается в использовании широковещательного состояния для связи и хранения правил по всему кластеру, но вы будете иметьпридумать способ оценить / выполнить правила.

См. https://training.da -platform.com / упражнения / taxiQuery.html и https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/examples/datastream_java/broadcast/BroadcastState.java для примеров.

...