Хорошо, спасибо Матиасу, нашел рабочее решение, но я буду благодарен советам по улучшению этого обходного пути.Потому что мне пришлось добавить метод только для тестирования (что мне не нравится).Проблема, как указал Матиас, заключалась в том, что serdes, генерируемые внутри топологии, не указывали на макет схемы.Поэтому я написал установщик serdes и установил «mocked serdes»
Код решения здесь:
Установщик для топологии (хотел бы изменить это ...)
cdlcfStreamsTopology.setSerdes(trackinSerde, logSerde);
Создание авро-серде с макетом схемы
Serde<Tracking> trackinSerde = getAvroSerde(mockSchemaRegistryClient);
Serde<Log> logSerde = getAvroSerde(mockSchemaRegistryClient);
private <T extends SpecificRecord> Serde<T> getAvroSerde(SchemaRegistryClient schemaRegistryClient) {
OwnSpecificAvroSerde serde = new OwnSpecificAvroSerde(schemaRegistryClient,schemaRegistryUrl);
return serde;
}
Мне пришлось создать этот OwnSpecificAvroSerde, чтобы пройти через конструктор mockedSchema.Для этого мне пришлось создать локальный пакет с тем же именем, что и у библиотеки avro, для доступа к классу по умолчанию, в котором есть конструктор схемы.
package io.confluent.kafka.streams.serdes.avro;
public class OwnSpecificAvroSerde<T extends GenericRecord> extends GenericAvroSerde {
private String registryUrl;
// public OwnSpecificAvroSerde(String registryUrl) {
// this.registryUrl=registryUrl;
// }
public OwnSpecificAvroSerde(SchemaRegistryClient schemaRegistryClient,String registryUrl) {
super(schemaRegistryClient);
}
public <T> Serde<T> getAvroSerde(boolean isKey, MockSchemaRegistryClient mockSchemaRegistryClient) {
return Serdes.serdeFrom(getSerializer(isKey,mockSchemaRegistryClient), getDeserializer(isKey,mockSchemaRegistryClient));
}
private <T> Serializer<T> getSerializer(boolean isKey, MockSchemaRegistryClient mockSchemaRegistryClient) {
Map<String, Object> map = new HashMap<>();
map.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, true);
map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
Serializer<T> serializer = (Serializer) new KafkaAvroSerializer(mockSchemaRegistryClient);
serializer.configure(map, isKey);
return serializer;
}
private <T> Deserializer<T> getDeserializer(boolean key, MockSchemaRegistryClient mockSchemaRegistryClient) {
Map<String, Object> map = new HashMap<>();
map.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
Deserializer<T> deserializer = (Deserializer) new KafkaAvroDeserializer(mockSchemaRegistryClient);
deserializer.configure(map, key);
return deserializer;
}
}
И важно, зарегистрировать схемы в реестре фиктивных схем:
mockSchemaRegistryClient.register(getSubjectName(mappedCdlcfTopic,false),Tracking.getClassSchema());
mockSchemaRegistryClient.register(getSubjectName(logsTopic,false), Log.getClassSchema());
Также пришлось импортировать getSubjectName
из библиотеки mockedSchema, чтобы сгенерировать тот же ключ, что и они, для поиска идентификатора схемы.
static String getSubjectName(String topic, boolean isKey) {
return isKey ? topic + "-key" : topic + "-value";
}