Проверяйте сообщения Kafka, используя Schema Registry API - PullRequest
2 голосов
/ 28 апреля 2020

Я внедряю процесс для создания сообщений kafka, и каждое сообщение должно иметь схему, проверенную реестром схемы. Для разработки я использую kafka и Schema Registry, используя docker, и мои схемы регистрируются пользовательским интерфейсом реестра схем.

Похоже, что мои схемы не проверяются или мне не хватает какой-либо конфигурации. Мой класс продюсера имеет следующий код:

package br.com.xx.realtime_transformation.producers;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.UUID;

public class KafkaEventProducer {

    private final String bootstrapServers;
    private final String schemaRegistryUrl;
    private final KafkaProducer<String, GenericRecord> kafkaProducer;

    public KafkaEventProducer(String bootstrapServers, String schemaRegistryUrl) {
        this.bootstrapServers = bootstrapServers;
        this.schemaRegistryUrl = schemaRegistryUrl;
        this.kafkaProducer = getProducer();
    }

    private KafkaProducer<String, GenericRecord> getProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl);

        return new KafkaProducer<>(props);
    }

    public void send(String topic, GenericRecord event) {
        try {
            String key = UUID.randomUUID().toString();
            final ProducerRecord producerRecord = new ProducerRecord<>(topic, key, event);

            this.kafkaProducer.send(producerRecord);
        } catch (final SerializationException e) {
            e.printStackTrace();
        }

    }

}

В большинстве случаев я получаю сообщение об ошибке типа "Схема не найдена", и когда это исключение не генерируется, мое сообщение не проверяется, оно просто отправляет сообщение на другую топи c.

Отсутствует ли какая-либо конфигурация?

Ответы [ 2 ]

1 голос
/ 28 апреля 2020

Kafka Producer работает с системным реестром и без системного реестра.

без системного реестра - Ниже приведен пример Kafka Producer без системного реестра

https://www.devglan.com/apache-kafka/apache-kafka-java-example

С реестром схем

Параметр schema.registry.url просто указывает, где мы храним схемы. Нам нужно предоставить определение схемы, как показано ниже. Вы не предоставляете определение схемы, но используете реестр схемы, поэтому вы получаете проблему.

{
  "namespace": "com.example",
  "type": "record",
  "name": "Employee",
  "doc" : "Represents an Employee at a company",
  "fields": [
      {"name": "firstName", "type": "string", "doc": "The persons given name"},
      {"name": "lastName", "type": "string"},
      {"name": "age",  "type": "int", "default": -1},
      {"name": "emails", "default":[], "type":{"type": "array", "items": "string"}},
      {"name": "phoneNumber",  "type": "string"}
   ]
} 

Рабочий процесс реестра схемы показан ниже.

enter image description here

Подробное объяснение можно найти по следующему URL. https://mapr.com/docs/61/Kafka/KafkaSchemaRegistry/KafkaSchemaRegistryDemo.html https://aseigneurin.github.io/2018/08/02/kafka-tutorial-4-avro-and-schema-registry.html https://www.confluent.jp/blog/kafka-connect-tutorial-transfer-avro-schemas-across-schema-registry-clusters/

0 голосов
/ 29 апреля 2020

Я не уверен, что понимаю вопрос, но вы, кажется, спрашиваете о проверке схемы.

Это , обрабатываемое в версии 5.4

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...