Доступен ли приемник JDB C на SQL сервере, чтобы пропустить непредусмотренный столбец из моего источника? - PullRequest
1 голос
/ 03 марта 2020

Я хочу извлечь все данные с моего PostgreSQL на SQL Сервер, используя Kafka Connect и JDB C. Я хочу избавиться от некоторых запросов, чтобы проверить, могу ли я сделать поток данных, используя только insert.mode=insert.

Это моя исходная конфигурация:

name=debezium_pg_connectors
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=1
plugin.name=pgoutput
database.hostname=XXX.XXX.XXX.XX
database.port=5432
database.user=XXXXXX
database.password=XXXXXX
database.dbname=XXXXX
database.server.name=XXXXX
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=XXXXXX
table.whitelist=XXXXXXX
time.precision.mode=connect
transforms=unwrap
transforms.unwrap.type= io.debezium.transforms.ExtractNewRecordState


Это моя конфигурация приемника:

name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=pj_user
connection.url=<connection>
auto.create=true
auto.evolve=true
insert.mode=insert
pk.mode=record_key
table.name.format=<table>
transforms=unwrap,route
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
transforms=route
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=([^.]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement = $3
fields.whitelist=...

На моем SQL сервере у меня есть автоматически сгенерированный столбец с именем key с типом данных uniqueidentifier и в качестве первичного ключа. Однако каждый раз, когда я пытался потерять свои данные, случается сбой:

[2020-03-03 12:45:11,487] ERROR WorkerSinkTask{id=jdbc-sink-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: java.sql.BatchUpdateException: Cannot insert the value NULL into column 'key', table '<table>'; column does not allow nulls. INSERT fails.

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Cannot insert the value NULL into column 'key', table '<table>'; column does not allow nulls. INSERT fails.

    ... 12 more

Если у кого-то есть идеи, чтобы помочь мне, любая помощь и совет приветствуются. спасибо

1 Ответ

0 голосов
/ 03 марта 2020

Убедитесь, что ваш столбец key имеет значение по умолчанию:

ALTER TABLE [tableName] ADD DEFAULT NEWSEQUENTIALID() FOR key
...