Когда я запускаю следующий код:
public SourceFunction<ObjectNode> get() {
return new FlinkKafkaConsumer<>(topic,
new AbstractDeserializationSchema<ObjectNode>() {
@Override
public ObjectNode deserialize(byte[] message) throws IOException {
ObjectReader objectReader =
new ObjectMapper()
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.readerFor(JsonNode.class)
.withFormatDetection(
new ObjectMapper().readerFor(com.fasterxml.jackson.databind.JsonNode.class),
new ObjectMapper(new SmileFactory()).readerFor(JsonNode.class));
return objectReader.readValue(message);
}
},
properties);
// return new FlinkKafkaConsumer<>(topic,
// new JSONKeyValueDeserializationSchema(false),
// properties);
}
Я получаю следующую ошибку:
org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields
.
Все вышеперечисленное является классами Джексона, кроме возвращаемого типа ObjectNode из пакета flink.
Как разобрать улыбку json от Кафки в исходном соединителе Кафки Флинка?