Коннектор Kafka Elasticsearch themes.regex - PullRequest
0 голосов
/ 04 августа 2020

Я использовал версию confluent kafka 5.1.14

Я пытался запустить коннектор Kafka Elasticsearch для отправки данных из Kafka в Elasticsearch. Я протестировал эту конфигурацию в автономном режиме, и все работает нормально. вот автономная конфигурация для стока elasticsearch

name=elasticsearch-sink-standalone
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics.regex=^[a-zA-Z0-9]((?!-raw$)[a-zA-Z0-9-_])+[a-zA-Z0-9]$
connection.url=http://elasticsearch:9200
type.name=_doc
key.ignore=true
schema.ignore=true

Но когда я использую ту же настройку для создания стока в распределенном режиме, это будет иметь ошибку. Это тело сообщения запроса при создании приемника

{
 "name" : "connector-test",
 "config" : {
  "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max" : "1",
  "topics.regex" : "^[a-zA-Z0-9]((?!-raw$)[a-zA-Z0-9-_])+[a-zA-Z0-9]$", 
  "connection.url" : "http://elasitcsearch:9200",
  "type.name" : "_doc",
  "key.ignore" : "true",
  "schema.ignore" : "true"
 }
}

Это ошибка, которую я получаю

[2020-08-04 18:18:35,911] ERROR WorkerSinkTask{id=connector-test-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:512)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:492)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:512)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'status': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"status-task-connector-test-0"; line: 1, column: 8]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'status': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"status-task-connector-test-0"; line: 1, column: 8]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2627)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
    at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4043)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2572)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:342)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:512)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:512)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:492)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2020-08-04 18:18:35,912] ERROR WorkerSinkTask{id=connector-test-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

Я могу использовать распределенный режим с другим регулярным выражением, поэтому я предполагаю ошибку вызвано регулярным выражением. Но что меня смутило, так это то, что автономный режим может работать с этим регулярным выражением.

И автономный, и распределенный режим работают с (я не уверен, помогает ли эта информация)

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

Пожалуйста, сообщите мне, если мне нужно предоставить дополнительную информацию. Спасибо!

1 Ответ

0 голосов
/ 04 августа 2020

Я не думаю, что проблема здесь в регулярном выражении. Трассировка стека ясно показывает, что сообщение пытается сериализоваться после опроса из исходной системы (в данном случае это Kafka, поскольку коннектор является приемником) и возникает ошибка something-2- json. Вам необходимо понять, в каком формате эти записи вызывают ошибку преобразования.

Регулярное выражение используется, чтобы узнать, из каких тем соединитель должен читать записи, и, похоже, это работает должным образом.

...