Проблема интеграции PostgreSQL и Kafka Connect - PullRequest
0 голосов
/ 07 октября 2018

Я тестирую соединитель JDBC Sink для вывода записей из Kafka в PostgreSQL.Вот конфигурация Соединителя:

{
    "name": "jdbc-sink-postgresql-1",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "role",
        "connection.url": "jdbc:postgresql://localhost:5432/postgres?user=&password=",
        "auto.create": "false",                                                   
        "insert.mode": "upsert",
        "mode":"incrementing",
        "table.name.format":"role",
        "pk.mode":"record_value",
        "pk.fields":"role_id"
    }
}

Когда я запускаю соединитель, я получаю следующее исключение:

java.sql.BatchUpdateException: Batch entry 1 INSERT INTO "role" ("role_id","role_name") VALUES (123,'admin') ON CONFLICT ("role_id") DO UPDATE SET "role_name"=EXCLUDED."role_name" was aborted.  
   Call getNextException to see the cause.
   at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleError(AbstractJdbc2Statement.java:2778))

Есть ли какие-либо указатели относительно того, что мне здесь не хватает?Пожалуйста, дайте мне знать, если вам нужна дополнительная информация.

1 Ответ

0 голосов
/ 07 октября 2018

Итак, проблема была со столом.Вот как я сначала создал таблицу:

CREATE TABLE role(
 role_id int PRIMARY KEY,
 role_name VARCHAR (255) UNIQUE NOT NULL
);

Тестовые данные в теме выглядели так:

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic role --property schema.registry.url=http://localhost:8081/  --property value.schema='{"type":"record","name":"myRecord","fields": [{"name": "role_id","type": "int"},{"name": "role_name","type": "string"}]}' --key-serializer org.apache.kafka.common.serialization.StringSerializer --value-serializer io.confluent.kafka.serializers.KafkaAvroSerializer --property print.key=true
{"role_id":122, "role_name":"admin"}
{"role_id":123, "role_name":"admin"}
{"role_id":124, "role_name":"admin"}
{"role_id":125, "role_name":"admin"}
{"role_id":126, "role_name":"admin"}

Итак, когда мои тестовые данные снова имели то же значение для поля role_nameи снова он нарушил ограничение уникальности и, следовательно, ошибку.

Что все, что я сделал?

Я удалил таблицу.

Создал новую таблицу без ограничения уникального ключа, и вышеприведенные данные были переданы вPostgreSQL без проблем.

...