Удаление строки в базе данных приемника после применения SMT в кафке не работает - PullRequest
0 голосов
/ 24 апреля 2020

После применения SMTS (например, развертывания и переименования) удаление строки в базе данных приемника завершается неудачно. без SMTS удаление строк в базе данных приемника работает хорошо.

Java версия :: 1.8.0_242

версия Kafka :: kafka_2.12-2.5.0

Исходная база данных :: Oracle База данных 11g Express Редакция выпуска 11.2.0.2.0 - 64-разрядная версия

База данных приемника :: Microsoft SQL Сервер 2017 (RTM-CU20) (KB4541283) - 14.0.3294.2 (X64) Developer Edition (64-разрядная версия) в Linux (Ubuntu 16.04.6 LTS)

Sink Connector :: confluentin c -kafka-connect-jdb c -5.4.1

configuration for Sink connector ::
name=mssql-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=POC.HR.EMPLOYEES
connection.url=jdbc:sqlserver://localhost:1433;databaseName=casewise
connection.user=SA
connection.password=1Secure*Password1
mode=bulk
insert.mode=upsert
table.name.format=casewise.dbo.employees
auto.create=true
pk.fields=empid
pk.mode=record_key
delete.enabled=true
transforms=unwrap,RenameField,RenameValue
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.operation.header=false
transforms.unwrap.delete.handling.mode=none
transforms.RenameField.type=org.apache.kafka.connect.transforms.ReplaceField$Key
transforms.RenameField.renames=EMPLOYEE_ID:empid
transforms.RenameValue.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameValue.renames=EMPLOYEE_ID:empid,FIRST_NAME:fstnm

Ключ ::

{'schema': {'type': 'struct', 'fields': [{'type': 'int32', 'optional': False, 'doc': 'Primary key of employees table.', 'field': 'EMPLOYEE_ID'}], 'optional': True, 'name': 'POC.HR.EMPLOYEES.Key'}, 'payload': {'EMPLOYEE_ID': 243}}

Значение ::

{'schema': {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'op'}, {'type': 'struct', 'fields': [{'type': 'int32', 'optional': False, 'doc': 'Primary key of employees table.', 'field': 'EMPLOYEE_ID'}, {'type': 'string', 'optional': True, 'doc': 'First name of the employee. A not null column.', 'field': 'FIRST_NAME'}, {'type': 'string', 'optional': False, 'doc': 'Last name of the employee. A not null column.', 'field': 'LAST_NAME'}, {'type': 'string', 'optional': False, 'doc': 'Email id of the employee', 'field': 'EMAIL'}, {'type': 'string', 'optional': True, 'doc': 'Phone number of the employee; includes country code and area code', 'field': 'PHONE_NUMBER'}, {'type': 'int32', 'optional': False, 'name': 'org.apache.kafka.connect.data.Date', 'version': 1, 'doc': 'Date when the employee started on this job. A not null column.', 'field': 'HIRE_DATE'}, {'type': 'string', 'optional': False, 'doc': 'Current job of the employee; foreign key to job_id column of the\njobs table. A not null column.', 'field': 'JOB_ID'}, {'type': 'double', 'optional': True, 'doc': 'Monthly salary of the employee. Must be greater\nthan zero (enforced by constraint emp_salary_min)', 'field': 'SALARY'}, {'type': 'double', 'optional': True, 'doc': 'Commission percentage of the employee; Only employees in sales\ndepartment elgible for commission percentage', 'field': 'COMMISSION_PCT'}, {'type': 'int32', 'optional': True, 'doc': 'Manager id of the employee; has same domain as manager_id in\ndepartments table. Foreign key to employee_id column of employees table.\n(useful for reflexive joins and CONNECT BY query)', 'field': 'MANAGER_ID'}, {'type': 'int16', 'optional': True, 'doc': 'Department id where employee works; foreign key to department_id\ncolumn of the departments table', 'field': 'DEPARTMENT_ID'}], 'optional': True, 'name': 'POC.HR.EMPLOYEES.Value', 'field': 'before'}, {'type': 'struct', 'fields': [{'type': 'int32', 'optional': False, 'doc': 'Primary key of employees table.', 'field': 'EMPLOYEE_ID'}, {'type': 'string', 'optional': True, 'doc': 'First name of the employee. A not null column.', 'field': 'FIRST_NAME'}, {'type': 'string', 'optional': False, 'doc': 'Last name of the employee. A not null column.', 'field': 'LAST_NAME'}, {'type': 'string', 'optional': False, 'doc': 'Email id of the employee', 'field': 'EMAIL'}, {'type': 'string', 'optional': True, 'doc': 'Phone number of the employee; includes country code and area code', 'field': 'PHONE_NUMBER'}, {'type': 'int32', 'optional': False, 'name': 'org.apache.kafka.connect.data.Date', 'version': 1, 'doc': 'Date when the employee started on this job. A not null column.', 'field': 'HIRE_DATE'}, {'type': 'string', 'optional': False, 'doc': 'Current job of the employee; foreign key to job_id column of the\njobs table. A not null column.', 'field': 'JOB_ID'}, {'type': 'double', 'optional': True, 'doc': 'Monthly salary of the employee. Must be greater\nthan zero (enforced by constraint emp_salary_min)', 'field': 'SALARY'}, {'type': 'double', 'optional': True, 'doc': 'Commission percentage of the employee; Only employees in sales\ndepartment elgible for commission percentage', 'field': 'COMMISSION_PCT'}, {'type': 'int32', 'optional': True, 'doc': 'Manager id of the employee; has same domain as manager_id in\ndepartments table. Foreign key to employee_id column of employees table.\n(useful for reflexive joins and CONNECT BY query)', 'field': 'MANAGER_ID'}, {'type': 'int16', 'optional': True, 'doc': 'Department id where employee works; foreign key to department_id\ncolumn of the departments table', 'field': 'DEPARTMENT_ID'}], 'optional': True, 'name': 'POC.HR.EMPLOYEES.Value', 'field': 'after'}, {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'version'}, {'type': 'string', 'optional': False, 'field': 'connector'}, {'type': 'int64', 'optional': False, 'name': 'org.apache.kafka.connect.data.Timestamp', 'version': 1, 'field': 'ts_ms'}, {'type': 'string', 'optional': True, 'field': 'txId'}, {'type': 'int64', 'optional': False, 'field': 'scn'}, {'type': 'string', 'optional': False, 'field': 'schema'}, {'type': 'string', 'optional': False, 'field': 'table'}, {'type': 'string', 'optional': True, 'field': 'user'}], 'optional': False, 'name': 'com.github.thake.logminer.kafka.connect.Source', 'field': 'source'}, {'type': 'int64', 'optional': False, 'name': 'org.apache.kafka.connect.data.Timestamp', 'version': 1, 'field': 'ts_ms'}], 'optional': False, 'name': 'POC.HR.EMPLOYEES.Envelope'}, 'payload': {'op': 'd', 'before': {'EMPLOYEE_ID': 243, 'FIRST_NAME': 'four', 'LAST_NAME': 'five', 'EMAIL': 'dontknowcause@hello.in', 'PHONE_NUMBER': '650.124.1334', 'HIRE_DATE': 18375, 'JOB_ID': 'ST_CLERK', 'SALARY': 2420.0, 'COMMISSION_PCT': None, 'MANAGER_ID': 120, 'DEPARTMENT_ID': 50}, 'after': None, 'source': {'version': '1.0', 'connector': 'logminer-kafka-connect', 'ts_ms': 1587673616000, 'txId': '06001600A3010000', 'scn': 488180, 'schema': 'HR', 'table': 'EMPLOYEES', 'user': 'HR'}, 'ts_ms': 1587673618504}}

статус :: сбой

журналы ::

INFO Невозможно найти поля [SinkRecordField {schema = Schema {STRING}, name = 'FIRST_NAME', isPrimaryKey = false}, SinkRecordField {schema = Schema {INT32}, name = 'EMPLOYEE_ID', isPrimaryKey = false}] среди имена столбцов [PHONE_NUMBER, fstnm, COMMISSION_PCT, MANAGER_ID, LAST_NAME, EMAIL, JOB_ID, SALARY, DEPARTMENT_ID, empid, HIRE_DATE] (io.confluent.connect.jdb c .sink.DbStructure: 223) 1025 *

ERROR WorkerSinkTask{id=mssql-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER "casewise"."dbo"."employees" to add missing field SinkRecordField{schema=Schema{INT32}, name='EMPLOYEE_ID', isPrimaryKey=false}, as it is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask:566)
org.apache.kafka.connect.errors.ConnectException: Cannot ALTER "casewise"."dbo"."employees" to add missing field SinkRecordField{schema=Schema{INT32}, name='EMPLOYEE_ID', isPrimaryKey=false}, as it is not optional and does not have a default value
        at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:134)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:74)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:120)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask
...