Настроить коннектор Apache Kafka JDBC - PullRequest
0 голосов
/ 05 июля 2019

Я хочу отправить данные, отправленные в тему, в базу данных postgresql.Поэтому я следую этому руководству и настроил файл свойств следующим образом:

name=transaction-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=transactions
connection.url=jdbc:postgresql://localhost:5432/db
connection.user=db-user
connection.password=
auto.create=true
insert.mode=insert
table.name.format=transaction
pk.mode=none

Я запускаю соединитель с

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink-quickstart-postgresql.properties

Соединитель приемника созданно не запускается из-за этой ошибки:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Схема находится в avro-формате и зарегистрирована, и я могу отправлять (производить) сообщения в тему и читать (потреблять) из нее.Но я не могу отправить его в базу данных.

Это мой ./etc/schema-registry/connect-avro-standalone.properties

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

Это продюсер, подающий тему с помощью java-api:

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

try (KafkaProducer<String, Transaction> producer = new KafkaProducer<>(properties)) {
    Transaction transaction = new Transaction();
    transaction.setFoo("foo");
    transaction.setBar("bar");
    UUID uuid = UUID.randomUUID();
    final ProducerRecord<String, Transaction> record = new ProducerRecord<>(TOPIC, uuid.toString(), transaction);
    producer.send(record);
}

Я проверяю, что данные правильно сериализованы и десериализованы с использованием

./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic transactions \
    --from-beginning --max-messages 1

База данных запущена и работает.

1 Ответ

1 голос
/ 05 июля 2019

Это не правильно:

Неизвестный магический байт может быть из-за id-поля, не являющегося частью схемы

Что означает эта ошибка, что сообщение по теме не было сериализовано с использованием сериализатора реестра Avro.

Как вы размещаете данные по теме?

Возможно, все сообщения имеют проблему, может быть, только некоторые, но по умолчанию это остановит задачу Kafka Connect.

Вы можете установить

"errors.tolerance":"all",

чтобы заставить его игнорировать сообщения, которые он не может десериализовать. Но если все они не правильно сериализованы в Avro, это не поможет, и вам нужно правильно сериализовать их или выбрать другой конвертер (например, если это действительно JSON, используйте JSONConverter).

Эти ссылки должны помочь вам больше:


Редактировать:

Если вы сериализуете ключ с помощью StringSerializer, вам нужно использовать это в вашей конфигурации Connect:

key.converter=org.apache.kafka.connect.storage.StringConverter

Вы можете установить его на рабочем (глобальное свойство, применяется ко всем запущенным на нем коннекторам) или только для этого коннектора (т. Е. Поместить его в свойства самого коннектора, оно переопределит настройки рабочего)

...