Другой результат при запуске Flink в локальном режиме и кластере пряжи - PullRequest
0 голосов
/ 24 апреля 2018

Я запускаю код, используя Flink Java API, который получает несколько байтов от Kafka и анализирует его, вставляя в базу данных Cassandra с использованием другой библиотеки статический метод (анализ и вставка результатов выполняется библиотекой). Запустив код в локальной среде в IDE, я получаю нужный ответ, но при работе в кластере YARN метод синтаксического анализа не работает должным образом!

public class Test {
    static HashMap<Integer, Object> ConfigHashMap = new HashMap<>();

    public static void main(String[] args) throws Exception {

        CassandraConnection.connect();
        Parser.setInsert(true);

        stream.flatMap(new FlatMapFunction<byte[], Void>() {
            @Override
            public void flatMap(byte[] value, Collector<Void> out) throws Exception {
                Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
                // Parser.parse(ByteBuffer.wrap(value));
            }
        });
        env.execute();
    }
}

В классе Parser есть статическое поле HashMap, в котором конфигурация анализа данных основана на его информации, и данные будут вставлять его во время выполнения. Проблема, связанная с запуском YARN, заключалась в том, что эти данные были недоступны для taskmanagers, и они просто выводят config is not available!

Поэтому я переопределяю этот HashMap в качестве параметра для метода parse, но без различий в результатах!

Как я могу решить проблему?

1 Ответ

0 голосов
/ 28 апреля 2018

Я изменил статические методы и поля на нестатические, и использование RichFlatMapFunction решило проблему.

stream.flatMap(new RichFlatMapFunction<byte[], Void>() {
            CassandraConnection con = new CassandraConnection();
            int i = 0 ;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                con.connect();
            }

            @Override
            public void flatMap(byte[] value, Collector<Void> out) throws Exception {

                ByteBuffer tb = ByteBuffer.wrap(value);
                np.parse(tb, ConfigHashMap, con);
            }
        });
...