Я пытаюсь запустить созданное мной приложение Flink, и по какой-то причине возникает ошибка
The implementation of the IterativeCondition is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
org.apache.flink.cep.pattern.Pattern.where(Pattern.java:153)
com.agt.engine.PatternCreator.initialEventPattern(PatternCreator.java:94)
com.agt.engine.EventSequence.createPatternSequence(EventSequence.java:38)
com.agt.MainRun.main(MainRun.java:49)
initialEventPattern
выглядит примерно так
public Map<String, Pattern> initialEventPattern(EventPattern event, Map<Integer, VariableFilter> variableFilterCondition ) throws Exception {
Map<String, Pattern> beginPatternMap = new HashMap<>();
String patternName = "start";
Method beginMethod = this.patternClass.getDeclaredMethod("begin", String.class, AfterMatchSkipStrategy.class);
Object inst = beginMethod.invoke(new GraphMap(), patternName, this.afterMatchSkipStrategy);
Pattern beginPattern = ((Pattern) inst).where(new IterativeCondition<GraphMap>() {
@Override
public boolean filter(GraphMap graphMap, Context<GraphMap> context) throws Exception {
getPatternVariables(event);
return filterVariables(graphMap, event.getSubject(), event.getPredicate()) &&
filterTriples(graphMap, event.getSubject());
}
});
Я видел эту ошибку ранее, когда я разрабатывал это.Но я смог удалить это, либо сделав необходимые классы сериализуемыми, либо сделав функции внутри итеративного условия статическими.Я пытаюсь создать шаблоны Flink (Flink-CEP) динамически, используя Java-отражения.До сих пор я управлял своей средой Flink непосредственно из основного класса.И это работало нормально и было достаточно до сих пор, так как я просто просматривал журналы, созданные при запуске.Но сейчас я пытаюсь развернуть его в Flink и увидеть некоторые метрики на панели Apache Flink.Я пытаюсь сделать это следующим образом (с уже запущенным flink (1.6.0), я использую flink-1.6.0 в mypom.xml)
bin/flink run -c com.sample.MainRun /path/to/target/flink-extension-1.0.jar
Я получаю ошибку, упомянутую выше.Это кажется очень странным, потому что я могу запустить его из своей IDE, запустив основной класс.Но когда я упаковываю его (mvn clean package) и запускаю его на flink, я получаю сериализуемую ошибку.
Любая помощь будет принята с благодарностью.Я сейчас нахожусь в трудном положении.
Заранее спасибо.