В настоящее время я работаю над интеграцией: источника коннектора Debezium MySQL (v0.9.0F), Kafka connect (confluent platform v5.1.2) и ES (v6.5.4) на стороне приемника.Исходный соединитель успешно может анализировать таблицы в MySQL (проверено в журналах kafka), но возникает ошибка ниже (с определенными таблицами и указанными столбцами) на стороне приемника (ES).
"type\":\"mapper_parsing_exception\",
\"reason\":\"Mapping definition for [column1] has unsupported parameters: [null_value : 1970-01-01T00:00:00Z]\"
column1
DDL выглядит следующим образом:
`column1` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
Однако в таблице нет данных, равных 1970-01-01T00:00:00Z
(не уверен, если это имеет значение)
Аналогично для столбца другой таблицы:
"type\":\"mapper_parsing_exception\",
\"reason\":\"Mapping definition for [column2] has unsupported parameters: [null_value : ---]\"
* DDL 1014 *:
`column2` char(3) COLLATE utf8_unicode_ci NOT NULL DEFAULT '---'
Стек вызовов ошибки:
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:253)
at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:65)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:257)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-02-22 15:48:40,217] ERROR WorkerSinkTask{id=stage_refdata_company_essink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.W
orkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot create mapping
<mapping of table explanation>
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:253)
at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:65)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:257)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more
Данные представлены в таблице со значением по умолчанию:'---'
.
- Почему ES выдает ошибку при этих 2 значениях?
column2
четко указал char(3)
в качестве своего типа, следовательно, '---'
должно быть допустимым значением.
Не могли бы вы помочь устранить эти ошибки?
Как избежать этих ошибок в будущем для некоторых других пользовательских значений по умолчанию?