мои свойства сливной раковины:
name=sink-oracle
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
# The topics to consume from - required for sink connectors like this one
topics=ersin_test
# Configuration specific to the JDBC sink connector.
# We want to connect to a SQLite database stored in the file test.db and auto-create tables.
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
connection.url=jdbc:oracle:thin:@10.0.0.0:123/abc
connection.user=ersin
connection.password=ersin!
table.name.format=ERSIN_TEST
auto.create=true
delete.enabled=true
pk.mode=record_key
pk.fields=ID
insert.mode=upsert
ошибка
INFO Checking Oracle dialect for existence of table "ERSIN_TEST" (io.confluent.connect.jdbc.dia
lect.OracleDatabaseDialect:536)
[2020-04-15 00:31:44,982] INFO Using Oracle dialect table "ERSIN_TEST" absent (io.confluent.connect.jdbc.dialect.OracleDa
tabaseDialect:544)
[2020-04-15 00:31:44,982] INFO Creating table with sql: CREATE TABLE "ERSIN_TEST" (
"ID" CLOB NOT NULL,
"PRODUCT" CLOB NULL,
"QUANTITY" NUMBER(10,0) NULL,
"PRICE" NUMBER(10,0) NULL,
PRIMARY KEY("ID")) (io.confluent.connect.jdbc.sink.DbStructure:92)
[2020-04-15 00:31:44,995] WARN Create failed, will attempt amend if table already exists (io.confluent.connect.jdbc.sink.
DbStructure:63)
java.sql.SQLException: ORA-02329: column of datatype LOB cannot be unique or a primary key
json данные
{
"schema": {
"type": "struct",
"fields": [
{
"field": 'ID',
"type": "int32",
"optional": False
},
{
"field": 'PRODUCT',
"type": "string",
"optional": True
},
{
"field": 'QUANTITY',
"type": "int32",
"optional": True
},
{
"field": 'PRICE',
"type": "int32",
"optional": True
}
],
"optional": True,
"name": "myrecord"
},
"payload": {
"ID": 1071,
"PRODUCT": 'ersin',
"QUANTITIY": 1071,
"PRICE": 1453
}
python код:
producer.send(topic, key=b'1071'
, value=json.dumps(v, default=json_util.default).encode('utf-8'))
как мне решить эту проблему?
заранее спасибо