Я хочу отправить данные, отправленные в тему, в базу данных postgresql.Поэтому я следую этому руководству и настроил файл свойств следующим образом:
name=transaction-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=transactions
connection.url=jdbc:postgresql://localhost:5432/db
connection.user=db-user
connection.password=
auto.create=true
insert.mode=insert
table.name.format=transaction
pk.mode=none
Я запускаю соединитель с
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink-quickstart-postgresql.properties
Соединитель приемника созданно не запускается из-за этой ошибки:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Схема находится в avro-формате и зарегистрирована, и я могу отправлять (производить) сообщения в тему и читать (потреблять) из нее.Но я не могу отправить его в базу данных.
Это мой ./etc/schema-registry/connect-avro-standalone.properties
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
Это продюсер, подающий тему с помощью java-api:
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
try (KafkaProducer<String, Transaction> producer = new KafkaProducer<>(properties)) {
Transaction transaction = new Transaction();
transaction.setFoo("foo");
transaction.setBar("bar");
UUID uuid = UUID.randomUUID();
final ProducerRecord<String, Transaction> record = new ProducerRecord<>(TOPIC, uuid.toString(), transaction);
producer.send(record);
}
Я проверяю, что данные правильно сериализованы и десериализованы с использованием
./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic transactions \
--from-beginning --max-messages 1
База данных запущена и работает.