Пересылка сообщений от Kafka в Elasticsearch и Postgresql - PullRequest
0 голосов
/ 17 апреля 2020

Я пытаюсь построить инфраструктуру, в которой мне нужно пересылать сообщения от одного kafka topi c наasticsearch и postgresql. Моя инфраструктура выглядит так, как показано на рисунке ниже, и все это работает на одном хосте. Logsta sh производит некоторую анонимность и некоторые изменения и отправляет документ обратно в kafka как json. Кафка должен затем переслать сообщение на PostgreSQL и Elasticsearch

enter image description here

Все работает нормально, примите соединение с postgresql, с которым я возникли проблемы.

Мои конфигурационные файлы выглядят следующим образом:

sink-quickstart-sqlite.properties

name=jdbc-test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#table.name.format=${topic}
topics=processed

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable:true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable:true

connection.url=jdbc:postgresql://localhost:5432/postgres
connection.user=postgres
connection.password=luka
insert.mode=upsert

pk.mode=kafka

pk_fields=__connect_topic,__connect_partition,__connect_offset
fields.whitelist=ident,auth,response,request,clientip
auto.create=true
auto.evolve=true

confluent- distrib.properties

group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/java

quicstart-elasticsearch.properties

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
#topics=test-elasticsearch-sink
topics=processed
key.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect
schema.ignore=true
schemas.enable=false

Служба confluent-schema-registry работает .

Я получаю следующую ошибку после curl http://localhost: 8083 / connectors / jdb c -sink / status | jq

{
  "name": "jdbc-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.50.37:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "192.168.50.37:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
                    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
                    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
                    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
                    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
                    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
                    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                    at java.base/java.lang.Thread.run(Thread.java:834)
                Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
                    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359)
                    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
                    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
                    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
                    ... 13 more
"
    }
  ],
  "type": "sink"
}

Это похоже на сообщение в моем "обработанном" topi c (сообщение в topi c однострочное, оно просто отформатировано):

{
    "ROWTIME": 1587134287569,
    "ROWKEY": "null",
    "bytes": "4050",
    "input": {
        "type": "log"
    },
    "clientip": "156.226.170.95",
    "@timestamp": "2020-04-17T14:38:06.346Z",
    "timestamp": "17/Apr/2020:14:37:57 +0000",
    "@version": "1",
    "request": "/lists",
    "ident": "536e605f097a92cb07c2a0a81f809f481c5af00d13305f0094057907792ce65e2a62b8ab8ba036f95a840504c3e2f05a",
    "httpversion": "1.1",
    "auth": "33a7f4a829adfaa60085eca1b84e0ae8f0aa2835d206ac765c22ad440e50d7ae462adafb13306aecfdcd6bd80b52b03f",
    "agent": {
        "ephemeral_id": "053b9c29-9038-4a89-a2b3-a5d8362460fe",
        "version": "7.6.2",
        "id": "50e21169-5aa0-496f-b792-3936e9c8de04",
        "hostname": "HOSTNAME_OF_MY_AWS_INSTANCE",
        "type": "filebeat"
    },
    "log": {
        "offset": 707943,
        "file": {
            "path": "/home/ec2-user/log/apache.log"
        }
    },
    "host": {
        "name": "HOSTNAME_OF_MY_AWS_INSTANCE"
    },
    "verb": "DELETE",
    "ecs": {
        "version": "1.4.0"
    },
    "response": "503"
}

Пожалуйста, дайте мне знать, если вам нужна дополнительная информация.

1 Ответ

1 голос
/ 17 апреля 2020

Ваша ошибка здесь:

DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Поскольку это JDB C Sink, вы должны предоставить схему для ваших данных. Если у вас есть возможность, я бы предложил вам использовать Avro. Если нет, вы должны структурировать свои JSON данные в соответствии с требованиями Kafka Connect.

Дополнительная информация: https://www.youtube.com/watch?v=b-3qN_tlYR4&t=981s

...