Я внедряю процесс для создания сообщений kafka, и каждое сообщение должно иметь схему, проверенную реестром схемы. Для разработки я использую kafka и Schema Registry, используя docker, и мои схемы регистрируются пользовательским интерфейсом реестра схем.
Похоже, что мои схемы не проверяются или мне не хватает какой-либо конфигурации. Мой класс продюсера имеет следующий код:
package br.com.xx.realtime_transformation.producers;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
public class KafkaEventProducer {
private final String bootstrapServers;
private final String schemaRegistryUrl;
private final KafkaProducer<String, GenericRecord> kafkaProducer;
public KafkaEventProducer(String bootstrapServers, String schemaRegistryUrl) {
this.bootstrapServers = bootstrapServers;
this.schemaRegistryUrl = schemaRegistryUrl;
this.kafkaProducer = getProducer();
}
private KafkaProducer<String, GenericRecord> getProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl);
return new KafkaProducer<>(props);
}
public void send(String topic, GenericRecord event) {
try {
String key = UUID.randomUUID().toString();
final ProducerRecord producerRecord = new ProducerRecord<>(topic, key, event);
this.kafkaProducer.send(producerRecord);
} catch (final SerializationException e) {
e.printStackTrace();
}
}
}
В большинстве случаев я получаю сообщение об ошибке типа "Схема не найдена", и когда это исключение не генерируется, мое сообщение не проверяется, оно просто отправляет сообщение на другую топи c.
Отсутствует ли какая-либо конфигурация?