Ошибка Flink при создании экземпляра Jackson Object Mapper - PullRequest
0 голосов
/ 04 июля 2019

Я новичок во Flink и столкнулся со странным поведением, которое я не понимаю.

Вот базовый пример моего кода:

ObjectMapper objectMapper = new ObjectMapper();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.addSource(MY_KAFKA_SOURCE)
   .map(m -> {
       String myStr = objectMapper.writeValueAsString(m);
       return myStr.getBytes();
   })
   .addSink(MY_ELASTICSEARCH_SINK);

try {
    env.execute();
}catch (Exception e){
    LOGGER.error(e.getMessage());
}

Когда я это делаю, мойКод работает, как и ожидалось, в местном.Но когда я пытаюсь запустить его на кластере Flink (через панель управления Flink Web), я получаю следующую ошибку (и ничего более):

org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 

После некоторых испытаний я обнаружил две вещи, которые яне знаю, как интерпретировать:

  • У меня всегда есть эта ошибка, когда я инициализирую new ObjectMapper();, даже если он не использовался.(Если я удаляю ObjectMapper, код запускается без ошибок, но он мне нужен :))
  • Когда я объявляю свой ObjectMapper внутри функции map, код работает хорошо.

В моем коде у меня также есть пользовательская функция Map, объявленная в другом классе и используемая следующим образом:

public class MyCustomFunction implements MapFunction<Row, Map<String, Object>> {
    @SuppressWarnings("unchecked")
    public CustomRowToMapFunction() {
        // Do something
    }

    @Override
    @SuppressWarnings("unchecked")
    public Map<String, Object> map(Row elt) throws Exception {
        // Do something
    }
}

env.addSource(MY_KAFKA_SOURCE)
   .map(new MyCustomFunction())
   .map(m -> {
       String myStr = objectMapper.writeValueAsString(m);
       return myStr.getBytes();
   })
   .addSink(MY_ELASTICSEARCH_SINK);

Так же, как и ранее, если я инициализирую new ObjectMapper(); в конструкторе, и даже если онне используется или только в конструкторе, я получаю ошибку Flink.Если я инициализирую ObjectMapper в функции map (), я не получаю ошибки.

Как вы можете понять, я немного растерялся в этом.Если у кого-то есть идея или объяснение того, что здесь происходит?

Примечания:

  • Я использую Flink 1.7.2 (пробовал с Flink 1.8.0)
  • ObjectMapper из FasterXML Jackson Core 2.9.9
...