Общая десериализация в кафке - PullRequest
0 голосов
/ 24 апреля 2019

У меня есть два класса для сериализации и десериализации в кафке.Сериализация работает нормально, но у меня проблема с десериализацией.

Я нашел много решений, но ничего не работает.

Десериализатор с универсальным классом T

public class DeserializerU<T> implements Deserializer<T> {

@Override
public void configure(Map map, boolean bln) {
}

@Override
public void close() {
}

@Override
public T deserialize(String string, byte[] bytes) {
    ObjectMapper mapper = new ObjectMapper();
    T object = null;
    try {
      object = mapper.readValue(bytes, new TypeReference<T>() {});
    } catch (Exception e) {
        e.printStackTrace();
    }
    return object;
}

Сериализатор

public class MyObjectSerializer implements Serializer {

@Override
public void configure(Map map, boolean bln) {
}

@Override
public byte[] serialize(String string, Object t) {
    byte[] retVal = null;
    ObjectMapper objectMapper = new ObjectMapper();
    try {
      retVal = objectMapper.writeValueAsString(t).getBytes();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return retVal;
}

@Override
public void close() {   
} 

Свойства набора десериализатора

Properties props = new Properties();
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new DeserializerU<MyOwnObject>().getClass());

Если я заменю «TypeRefence () {}» на определенный тип, то десериализатор работает, но мне нужен десериализатор для многих объектов.Я также пробовал convertValue вместо readValue, но все возвращает LinkedHashMap, который не может быть преобразован в мой объект.Любой совет, как это сделать?Спасибо за помощь

...