Краткое объяснение того, чего я хочу достичь: я хочу сделать функциональные тесты для топологии потока Кафки (используя TopologyTestDriver) для записей avro.
Проблемы: Не удается «смоделировать» schemaRegistry для автоматизации публикации / чтения схемы
До сих пор я пытался использовать MockSchemaRegistryClient, чтобы попытаться смоделировать schemaRegistry, но я не знаю, какчтобы связать его с Avro Serde.
Код
public class SyncronizerIntegrationTest {
private ConsumerRecordFactory<String, Tracking> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new SpecificAvroSerializer<>());
MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
@Test
void integrationTest() throws IOException, RestClientException {
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081"); //Dunno if this do anything? :/
StreamsBuilder kStreamBuilder = new StreamsBuilder();
Serde<Tracking> avroSerde = getAvroSerde();
mockSchemaRegistryClient.register(Tracking.getClassSchema().getName(), Tracking.getClassSchema());
KStream<String, Tracking> unmappedOrdersStream = kStreamBuilder.stream(
"topic",
Consumed.with(Serdes.String(), avroSerde));
unmappedOrdersStream
.filter((k, v) -> v != null).to("ouput");
Topology topology = kStreamBuilder.build();
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));
}
}
Метод AvroSerde
private <T extends SpecificRecord> Serde<T> getAvroSerde() {
// Configure Avro ser/des
final Map<String,String> avroSerdeConfig = new HashMap<>();
avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");
final Serde<T> avroSerde = new SpecificAvroSerde<>();
avroSerde.configure(avroSerdeConfig, false); // `false` for record values
return avroSerde;
}
Если я запускаю тест без testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));
, он работает хорошо (выглядит каквсе правильно установлено)
Но
Когда я пытаюсь вставить данные ( pipeInput ), выдается следующее исключение: Объект «Отслеживание» заполнен полностью.
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:270)
Отредактировано, я не удалил это, для "журнала истории", чтобы указать путь.