сериализовать объект для кафки во флинк - PullRequest
0 голосов
/ 21 мая 2019

Я пытаюсь использовать flink для чтения данных из kafka, выполнить некоторую функцию и вернуть результат в другую тему Kafka, но получаю следующую ошибку. `org.apache.flink.api.common.InvalidProgramException: реализация MapFunction не сериализуема. Объект, вероятно, содержит или ссылается на непериализуемые поля.

` Я получаю сообщения от Кафки - делаю какие-то манипуляции с ним и возвращает список объектов, которые я хочу отправить в другую тему.

class Wrapper implements Serializable{
        @JsonProperty("viewBuilderRequests")
        private ArrayList<ViewBuilderRequest> viewBuilderRequests;

        public Wrapper(){}

        public Wrapper(ArrayList<ViewBuilderRequest> viewBuilderRequests) {
            this.viewBuilderRequests = viewBuilderRequests;
        }

        public List<ViewBuilderRequest> getViewBuilderRequests() {
            return viewBuilderRequests;
        }

        public void setViewBuilderRequests(ArrayList<ViewBuilderRequest> viewBuilderRequests) {
            this.viewBuilderRequests = viewBuilderRequests;
        }
    }



public class ViewBuilderRequest implements Serializable {
    private CdmId cdmId
    private ViewBuilderOperation operation
    private List<ViewUserSystemIdentifier> viewUserSystemIdentifiers
    public ViewBuilderRequest(){
}

    public CdmId getCdmId() {
        return cdmId;
    }

    public void setCdmId(CdmId cdmId) {
        this.cdmId = cdmId;
    }

    public ViewBuilderOperation getOperation() {
        return operation;
    }

    public void setOperation(ViewBuilderOperation operation) {
        this.operation = operation;
    }

    public List<ViewUserSystemIdentifier> getViewUserSystemIdentifiers() {
        return viewUserSystemIdentifiers;
    }

    public void setViewUserSystemIdentifiers(List<ViewUserSystemIdentifier> viewUserSystemIdentifiers) {
        this.viewUserSystemIdentifiers = viewUserSystemIdentifiers;
    }

    public enum ViewBuilderOperation implements Serializable{
        Create, Update,Delete
    }




private MapFunction<String, Wrapper> parseAndSendToGraphProcessing = s ->{
    UserMatchingRequest userMatchingRequest = objectMapper.readValue(s, UserMatchingRequest.class);
    Wrapper wrapper = new Wrapper(janusGraphDataProcessing.handleMessage(userMatchingRequest));
    return wrapper;
};

внутренние классы тоже реализуют Serializable

исключение выдается из этого кода:

dataStream.map(parseAndSendToGraphProcessing) .addSink(new FlinkKafkaProducer<Wrapper>(kafkaConfiguration.getBootstrapServers(),"graphNotifications",new WrapperSchema()));

У меня также есть де / сериализация для обоих объектов.

public class WrapperSchema implements DeserializationSchema<Wrapper>, SerializationSchema<Wrapper> {
//        private final static ObjectMapper objectMapper = new ObjectMapper().configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);

    static ObjectMapper objectMapper = new ObjectMapper();

    @Override
        public Wrapper deserialize(byte[] message) throws IOException {
            return objectMapper.readValue(message, Wrapper.class);
        }

        @Override
        public boolean isEndOfStream(Wrapper nextElement) {
            return false;
        }

        @Override
        public byte[] serialize(Wrapper element) {
//            return element.toString().getBytes();
            if(objectMapper == null) {
                objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
                objectMapper = new ObjectMapper();
            }
            try {
                String json = objectMapper.writeValueAsString(element);
                return json.getBytes();
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }

            return new byte[0];
        }

        @Override
        public TypeInformation<Wrapper> getProducedType() {
            return TypeInformation.of(Wrapper.class);
        }
}

1 Ответ

0 голосов
/ 22 мая 2019

Для работы flink ваши сообщения и функции вашей карты должны быть сериализуемыми.

Похоже, ваши сообщения сериализуются, насколько я могу судить.

Но ваша функция карты - нет. Иногда трудно заставить лямбду быть сериализуемой. Я думаю, что в вашем случае проблема в том, что parseAndSendToGraphProcessing использует objectMapper и janusGraphDataProcessing, которые должны быть сериализуемыми.

Я предполагаю, что janusGraphDataProcessing не сериализуем (OjbectMapper, если вы используете Джексон 2.1 или новее).

Если это так, то одним из способов является написание собственного класса RichMapFunction, который будет хранить janusGraphDataProcessing в качестве переходной переменной и инициализировать его в своей функции open.

private MapFunction<String, Wrapper> parseAndSendToGraphProcessing = s ->{
    UserMatchingRequest userMatchingRequest = objectMapper.readValue(s, UserMatchingRequest.class);
    Wrapper wrapper = new Wrapper(janusGraphDataProcessing.handleMessage(userMatchingRequest));
    return wrapper;
};
...