Я пытаюсь построить инфраструктуру, в которой мне нужно пересылать сообщения от одного kafka topi c наasticsearch и postgresql. Моя инфраструктура выглядит так, как показано на рисунке ниже, и все это работает на одном хосте. Logsta sh производит некоторую анонимность и некоторые изменения и отправляет документ обратно в kafka как json. Кафка должен затем переслать сообщение на PostgreSQL и Elasticsearch
Все работает нормально, примите соединение с postgresql, с которым я возникли проблемы.
Мои конфигурационные файлы выглядят следующим образом:
sink-quickstart-sqlite.properties
name=jdbc-test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#table.name.format=${topic}
topics=processed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable:true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable:true
connection.url=jdbc:postgresql://localhost:5432/postgres
connection.user=postgres
connection.password=luka
insert.mode=upsert
pk.mode=kafka
pk_fields=__connect_topic,__connect_partition,__connect_offset
fields.whitelist=ident,auth,response,request,clientip
auto.create=true
auto.evolve=true
confluent- distrib.properties
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
quicstart-elasticsearch.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
#topics=test-elasticsearch-sink
topics=processed
key.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect
schema.ignore=true
schemas.enable=false
Служба confluent-schema-registry работает .
Я получаю следующую ошибку после curl http://localhost: 8083 / connectors / jdb c -sink / status | jq
{
"name": "jdbc-sink",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.50.37:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "192.168.50.37:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
"
}
],
"type": "sink"
}
Это похоже на сообщение в моем "обработанном" topi c (сообщение в topi c однострочное, оно просто отформатировано):
{
"ROWTIME": 1587134287569,
"ROWKEY": "null",
"bytes": "4050",
"input": {
"type": "log"
},
"clientip": "156.226.170.95",
"@timestamp": "2020-04-17T14:38:06.346Z",
"timestamp": "17/Apr/2020:14:37:57 +0000",
"@version": "1",
"request": "/lists",
"ident": "536e605f097a92cb07c2a0a81f809f481c5af00d13305f0094057907792ce65e2a62b8ab8ba036f95a840504c3e2f05a",
"httpversion": "1.1",
"auth": "33a7f4a829adfaa60085eca1b84e0ae8f0aa2835d206ac765c22ad440e50d7ae462adafb13306aecfdcd6bd80b52b03f",
"agent": {
"ephemeral_id": "053b9c29-9038-4a89-a2b3-a5d8362460fe",
"version": "7.6.2",
"id": "50e21169-5aa0-496f-b792-3936e9c8de04",
"hostname": "HOSTNAME_OF_MY_AWS_INSTANCE",
"type": "filebeat"
},
"log": {
"offset": 707943,
"file": {
"path": "/home/ec2-user/log/apache.log"
}
},
"host": {
"name": "HOSTNAME_OF_MY_AWS_INSTANCE"
},
"verb": "DELETE",
"ecs": {
"version": "1.4.0"
},
"response": "503"
}
Пожалуйста, дайте мне знать, если вам нужна дополнительная информация.