вы можете добиться универсальной десериализации, используя TypeReference из пакета com.fasterxml.jackson.core.type
public class KafkaGenericDeserializer<T> implements Deserializer<T> {
private final ObjectMapper mapper;
private final TypeReference<T> typeReference;
public KafkaGenericDeserializer(ObjectMapper mapper, TypeReference<T> typeReference) {
this.mapper = mapper;
this.typeReference = typeReference;
}
@Override
public T deserialize(final String topic, final byte[] data) {
if (data == null) {
return null;
}
try {
return mapper.readValue(data, typeReference);
} catch (final IOException ex) {
throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) + "] from topic [" + topic + "]", ex);
}
}
@Override
public void close() {}
@Override
public void configure(final Map<String, ?> settings, final boolean isKey) {}
}
, используя такой универсальный десериализатор, вы можете создать Serge
:
public static <T> Serde<T> createSerdeWithGenericDeserializer(TypeReference<T> typeReference) {
KafkaGenericDeserializer<T> kafkaGenericDeserializer = new KafkaGenericDeserializer<>(objectMapper, typeReference);
return Serdes.serdeFrom(new JsonSerializer<>(), kafkaGenericDeserializer);
}
здесь JsonSerializer
относится к spring-kafka
зависимости или реализуете свою собственную сериализацию.
после этого вы можете использовать serde при создании потока Kafka:
TypeReference<YourGenericClass<SpecificClass>> typeReference = new TypeReference<YourGenericClass<SpecificClass>>() {};
Serde<YourGenericClass<SpecificClass>> itemSerde = createSerdeWithGenericDeserializer(typeReference);
Consumed<String, YourGenericClass<SpecificClass>> consumed = Consumed.with(Serdes.String(), itemSerde);
streamsBuilder.stream(topicName, consumed);