Бросок приложения Flink Реализация IterativeCondition не сериализуема - PullRequest
0 голосов
/ 12 декабря 2018

Я пытаюсь запустить созданное мной приложение Flink, и по какой-то причине возникает ошибка

The implementation of the IterativeCondition is not serializable. The object probably contains or references non serializable fields.
    org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
    org.apache.flink.cep.pattern.Pattern.where(Pattern.java:153)
    com.agt.engine.PatternCreator.initialEventPattern(PatternCreator.java:94)
    com.agt.engine.EventSequence.createPatternSequence(EventSequence.java:38)
    com.agt.MainRun.main(MainRun.java:49)

initialEventPattern выглядит примерно так

  public Map<String, Pattern> initialEventPattern(EventPattern event, Map<Integer, VariableFilter> variableFilterCondition ) throws Exception {
    Map<String, Pattern> beginPatternMap = new HashMap<>();
    String patternName = "start";
    Method beginMethod = this.patternClass.getDeclaredMethod("begin", String.class, AfterMatchSkipStrategy.class);
    Object inst = beginMethod.invoke(new GraphMap(), patternName, this.afterMatchSkipStrategy);
    Pattern beginPattern = ((Pattern) inst).where(new IterativeCondition<GraphMap>() {
        @Override
        public boolean filter(GraphMap graphMap, Context<GraphMap> context) throws Exception {
            getPatternVariables(event);
            return filterVariables(graphMap, event.getSubject(), event.getPredicate()) &&
                    filterTriples(graphMap, event.getSubject());
        }
    });

Я видел эту ошибку ранее, когда я разрабатывал это.Но я смог удалить это, либо сделав необходимые классы сериализуемыми, либо сделав функции внутри итеративного условия статическими.Я пытаюсь создать шаблоны Flink (Flink-CEP) динамически, используя Java-отражения.До сих пор я управлял своей средой Flink непосредственно из основного класса.И это работало нормально и было достаточно до сих пор, так как я просто просматривал журналы, созданные при запуске.Но сейчас я пытаюсь развернуть его в Flink и увидеть некоторые метрики на панели Apache Flink.Я пытаюсь сделать это следующим образом (с уже запущенным flink (1.6.0), я использую flink-1.6.0 в mypom.xml)

bin/flink run -c com.sample.MainRun /path/to/target/flink-extension-1.0.jar

Я получаю ошибку, упомянутую выше.Это кажется очень странным, потому что я могу запустить его из своей IDE, запустив основной класс.Но когда я упаковываю его (mvn clean package) и запускаю его на flink, я получаю сериализуемую ошибку.

Любая помощь будет принята с благодарностью.Я сейчас нахожусь в трудном положении.

Заранее спасибо.

...