Apache Flink: как мне использовать поток Java Map (или Map, содержащую DTO)? - PullRequest
0 голосов
/ 02 октября 2019

Я использую Flink, и в мою систему поступает поток строк JSON с динамически изменяющимися и вложенными полями. Поэтому я не могу издеваться и конвертировать этот входящий JSON как статический POJO, и вместо этого мне нужно полагаться на карту.

Мое первое преобразование заключается в преобразовании потока строк JSON в поток объекта Map с использованием синтаксического анализа GSON, а затем я оборачиваю карту в DTO с именем Data.

(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);

Data data = new Data(map); // Data has getters, setters for the map and implements Serializable

Проблема возникает, когда сразу после этой обработки преобразования я пытаюсь передать результирующий поток в мой пользовательский приемник Flink. Функция invoke не вызывается в приемнике. Однако приемник работает, если я перехожу с этой Карты, содержащей DTO, на примитив или обычный DTO без Карты.

Мой DTO выглядит так:

public class FakeDTO {
    private String id;
    private LinkedTreeMap map; // com.google.gson.internal

    // getters and setters
    // constructors, empty and with fields

Я попробовал дваследующие решения:

env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class; 
env.getConfig().disableGenericTypes();

Кто-нибудь из экспертов посоветует мне воспользоваться в этой ситуации?

1 Ответ

0 голосов
/ 03 октября 2019

Мне удалось решить эту проблему. В моих журналах Flink я видел, что один файл Kryo с именем ReflectionSerializerFactory class не был найден. Я обновил версию Kryo в maven и использовал тип карты для своей карты, которую в документации Flink говорится, что Flink поддерживает.

Просто убедитесь, что в вашем коде указаны универсальные типы, и добавьте геттеры и сеттеры в ваши POJO для Карт.

Я также использую декретрацию типа .returns (xyz.class), чтобы избежать эффектов стирания типа.

...