Как реализовать универсальный десериализатор Kafka Streams - PullRequest
0 голосов
/ 24 мая 2018

Мне нравится Кафка, но я ненавижу писать много сериализаторов / десериализаторов, поэтому я попытался создать GenericDeserializer<T>, который мог бы десериализовать универсальный тип T.

Вот моя попытка:

class GenericDeserializer< T > implements Deserializer< T > {
    static final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }
    @Override
    public T deserialize( String topic, byte[] data) {
            T result = null;
            try {
                    result = ( T )( objectMapper.readValue( data, T.class ) );
            }
            catch ( Exception e ) {
                    e.printStackTrace();
            }
            return result;
    }
    @Override
    public void close() {
    }
}

Однако Java-компилятор (Eclipse) жалуется на строку

result = ( T )( objectMapper.readValue( data, T.class ) );

с сообщением Illegal class literal for the type parameter T.

Вопросы:

  1. Можете ли выобъясните, пожалуйста, значение сообщения?
  2. Есть ли способ обойти это, чтобы получить желаемый эффект?

Ответы [ 2 ]

0 голосов
/ 21 октября 2018

вы можете добиться универсальной десериализации, используя TypeReference из пакета com.fasterxml.jackson.core.type

public class KafkaGenericDeserializer<T> implements Deserializer<T> {

    private final ObjectMapper mapper;
    private final TypeReference<T> typeReference;

    public KafkaGenericDeserializer(ObjectMapper mapper, TypeReference<T> typeReference) {
        this.mapper = mapper;
        this.typeReference = typeReference;
    }

    @Override
    public T deserialize(final String topic, final byte[] data) {
        if (data == null) {
            return null;
        }

        try {
            return mapper.readValue(data, typeReference);
        } catch (final IOException ex) {
            throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) + "] from topic [" + topic + "]", ex);
        }
    }

    @Override
    public void close() {}

    @Override
    public void configure(final Map<String, ?> settings, final boolean isKey) {}
}

, используя такой универсальный десериализатор, вы можете создать Serge:

public static <T> Serde<T> createSerdeWithGenericDeserializer(TypeReference<T> typeReference) {
    KafkaGenericDeserializer<T> kafkaGenericDeserializer = new KafkaGenericDeserializer<>(objectMapper, typeReference);
    return Serdes.serdeFrom(new JsonSerializer<>(), kafkaGenericDeserializer);
}

здесь JsonSerializer относится к spring-kafka зависимости или реализуете свою собственную сериализацию.

после этого вы можете использовать serde при создании потока Kafka:

TypeReference<YourGenericClass<SpecificClass>> typeReference = new TypeReference<YourGenericClass<SpecificClass>>() {};
Serde<YourGenericClass<SpecificClass>> itemSerde = createSerdeWithGenericDeserializer(typeReference);
Consumed<String, YourGenericClass<SpecificClass>> consumed = Consumed.with(Serdes.String(), itemSerde);
streamsBuilder.stream(topicName, consumed);
0 голосов
/ 24 мая 2018

В Java вы не можете создать экземпляр универсального типа, даже рефлексивно, то есть objectMapper.readValue() не может ничего с T.classПоэтому вам нужно знать, какой класс создать в данной ситуации.Логический способ сделать это состоит в отображении темы -> типа, к которой ваш десериализатор может получить доступ.Примером этого может быть SpecificAvroSerde, который использует реестр схемы слияния (внешний процесс) для идентификации, в какой тип десериализоваться.Вы также можете встроить это отображение в свой код, но в зависимости от вашего варианта использования это не будет особенно надежным.

https://github.com/confluentinc/schema-registry/blob/master/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro/SpecificAvroSerde.java

Мясо SpecificAvroSerde - этонемного глубже - вот кусок, который выполняет работу по запросу реестра схемы, в какой тип он должен декодироваться: https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L109-L139

Конечно, весь этот код затуманен сложностями Avro.Я написал бы пример кода о том, как сделать это в памяти с JSON, если бы у меня было время.

...