Я использую Flink, и в мою систему поступает поток строк JSON с динамически изменяющимися и вложенными полями. Поэтому я не могу издеваться и конвертировать этот входящий JSON как статический POJO, и вместо этого мне нужно полагаться на карту.
Мое первое преобразование заключается в преобразовании потока строк JSON в поток объекта Map с использованием синтаксического анализа GSON, а затем я оборачиваю карту в DTO с именем Data.
(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);
Data data = new Data(map); // Data has getters, setters for the map and implements Serializable
Проблема возникает, когда сразу после этой обработки преобразования я пытаюсь передать результирующий поток в мой пользовательский приемник Flink. Функция invoke не вызывается в приемнике. Однако приемник работает, если я перехожу с этой Карты, содержащей DTO, на примитив или обычный DTO без Карты.
Мой DTO выглядит так:
public class FakeDTO {
private String id;
private LinkedTreeMap map; // com.google.gson.internal
// getters and setters
// constructors, empty and with fields
Я попробовал дваследующие решения:
env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class;
env.getConfig().disableGenericTypes();
Кто-нибудь из экспертов посоветует мне воспользоваться в этой ситуации?