Flink Kafka Connector - поддержка JsonSmile в FlinkKafkaConsumer - PullRequest
0 голосов
/ 13 февраля 2019

Когда я запускаю следующий код:

public SourceFunction<ObjectNode> get() {
        return new FlinkKafkaConsumer<>(topic,
                new AbstractDeserializationSchema<ObjectNode>() {
                    @Override
                    public ObjectNode deserialize(byte[] message) throws IOException {
                        ObjectReader objectReader =
                                new ObjectMapper()
                                        .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
                                        .readerFor(JsonNode.class)
                                        .withFormatDetection(
                                                new ObjectMapper().readerFor(com.fasterxml.jackson.databind.JsonNode.class),
                                                new ObjectMapper(new SmileFactory()).readerFor(JsonNode.class));

                        return objectReader.readValue(message);
                    }
                },
                properties);

//        return new FlinkKafkaConsumer<>(topic,
//                                            new JSONKeyValueDeserializationSchema(false),
//                                            properties);
    }

Я получаю следующую ошибку:

  org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields


.

Все вышеперечисленное является классами Джексона, кроме возвращаемого типа ObjectNode из пакета flink.

Как разобрать улыбку json от Кафки в исходном соединителе Кафки Флинка?

...