Я новичок во Flink и столкнулся со странным поведением, которое я не понимаю.
Вот базовый пример моего кода:
ObjectMapper objectMapper = new ObjectMapper();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.addSource(MY_KAFKA_SOURCE)
.map(m -> {
String myStr = objectMapper.writeValueAsString(m);
return myStr.getBytes();
})
.addSink(MY_ELASTICSEARCH_SINK);
try {
env.execute();
}catch (Exception e){
LOGGER.error(e.getMessage());
}
Когда я это делаю, мойКод работает, как и ожидалось, в местном.Но когда я пытаюсь запустить его на кластере Flink (через панель управления Flink Web), я получаю следующую ошибку (и ничего более):
org.apache.flink.client.program.ProgramInvocationException: The program caused an error:
После некоторых испытаний я обнаружил две вещи, которые яне знаю, как интерпретировать:
- У меня всегда есть эта ошибка, когда я инициализирую
new ObjectMapper();
, даже если он не использовался.(Если я удаляю ObjectMapper, код запускается без ошибок, но он мне нужен :)) - Когда я объявляю свой ObjectMapper внутри функции map, код работает хорошо.
В моем коде у меня также есть пользовательская функция Map, объявленная в другом классе и используемая следующим образом:
public class MyCustomFunction implements MapFunction<Row, Map<String, Object>> {
@SuppressWarnings("unchecked")
public CustomRowToMapFunction() {
// Do something
}
@Override
@SuppressWarnings("unchecked")
public Map<String, Object> map(Row elt) throws Exception {
// Do something
}
}
env.addSource(MY_KAFKA_SOURCE)
.map(new MyCustomFunction())
.map(m -> {
String myStr = objectMapper.writeValueAsString(m);
return myStr.getBytes();
})
.addSink(MY_ELASTICSEARCH_SINK);
Так же, как и ранее, если я инициализирую new ObjectMapper();
в конструкторе, и даже если онне используется или только в конструкторе, я получаю ошибку Flink.Если я инициализирую ObjectMapper в функции map (), я не получаю ошибки.
Как вы можете понять, я немного растерялся в этом.Если у кого-то есть идея или объяснение того, что здесь происходит?
Примечания:
- Я использую Flink 1.7.2 (пробовал с Flink 1.8.0)
- ObjectMapper из FasterXML Jackson Core 2.9.9