Я хочу извлечь все данные с моего PostgreSQL на SQL Сервер, используя Kafka Connect и JDB C. Я хочу избавиться от некоторых запросов, чтобы проверить, могу ли я сделать поток данных, используя только insert.mode=insert
.
Это моя исходная конфигурация:
name=debezium_pg_connectors
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=1
plugin.name=pgoutput
database.hostname=XXX.XXX.XXX.XX
database.port=5432
database.user=XXXXXX
database.password=XXXXXX
database.dbname=XXXXX
database.server.name=XXXXX
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=XXXXXX
table.whitelist=XXXXXXX
time.precision.mode=connect
transforms=unwrap
transforms.unwrap.type= io.debezium.transforms.ExtractNewRecordState
Это моя конфигурация приемника:
name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=pj_user
connection.url=<connection>
auto.create=true
auto.evolve=true
insert.mode=insert
pk.mode=record_key
table.name.format=<table>
transforms=unwrap,route
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
transforms=route
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=([^.]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement = $3
fields.whitelist=...
На моем SQL сервере у меня есть автоматически сгенерированный столбец с именем key
с типом данных uniqueidentifier
и в качестве первичного ключа. Однако каждый раз, когда я пытался потерять свои данные, случается сбой:
[2020-03-03 12:45:11,487] ERROR WorkerSinkTask{id=jdbc-sink-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: java.sql.BatchUpdateException: Cannot insert the value NULL into column 'key', table '<table>'; column does not allow nulls. INSERT fails.
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Cannot insert the value NULL into column 'key', table '<table>'; column does not allow nulls. INSERT fails.
... 12 more
Если у кого-то есть идеи, чтобы помочь мне, любая помощь и совет приветствуются. спасибо