Я запускаю код, используя Flink Java API, который получает несколько байтов от Kafka и анализирует его, вставляя в базу данных Cassandra с использованием другой библиотеки статический метод (анализ и вставка результатов выполняется библиотекой). Запустив код в локальной среде в IDE, я получаю нужный ответ, но при работе в кластере YARN метод синтаксического анализа не работает должным образом!
public class Test {
static HashMap<Integer, Object> ConfigHashMap = new HashMap<>();
public static void main(String[] args) throws Exception {
CassandraConnection.connect();
Parser.setInsert(true);
stream.flatMap(new FlatMapFunction<byte[], Void>() {
@Override
public void flatMap(byte[] value, Collector<Void> out) throws Exception {
Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
// Parser.parse(ByteBuffer.wrap(value));
}
});
env.execute();
}
}
В классе Parser
есть статическое поле HashMap, в котором конфигурация анализа данных основана на его информации, и данные будут вставлять его во время выполнения. Проблема, связанная с запуском YARN, заключалась в том, что эти данные были недоступны для taskmanagers
, и они просто выводят config is not available!
Поэтому я переопределяю этот HashMap в качестве параметра для метода parse
, но без различий в результатах!
Как я могу решить проблему?