Ошибка десериализации сообщения Avro на Sink Connector - PullRequest
0 голосов
/ 16 января 2020

Я получаю сообщение об ошибке десериализации сообщения Avro при попытке получить сообщение avro из topi c и выгрузить их в postgres DB.

Вот моя конфигурация производителя:

spring
  Kafka
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        schema.registry.url: http://localhost:8081

Конфигурация коннектора раковины:

{
"name": "jdbc_source_postgres_avro",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgres://localhost:5432/kafka-test",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "connection.user": "postgres",
    "connection.password": "password",
    "topics": "docker-avro-topic",
    "auto.create": "true",
    "auto.offset.reset": "latest",
    "name": "jdbc_source_postgres_avro"
}

docker -изображение для подключения:

connect:
image: cnfldemos/kafka-connect-datagen:0.1.3-5.3.1
hostname: connect
container_name: connect
depends_on:
  - zookeeper
  - broker
  - schema-registry
ports:
  - "8083:8083"
environment:
  CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
  CONNECT_REST_ADVERTISED_HOST_NAME: connect
  CONNECT_REST_PORT: 8083
  CONNECT_GROUP_ID: compose-connect-group
  CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
  CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
  CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
  CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
  CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
  CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
  CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
  CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
  CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
  CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
  CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
  CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
  CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'

И, наконец, журнал ошибок:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded
in error handler    at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.kafka.connect.errors.DataException: Failed to deserialize
data for topic docker-avro-topic to Avro:   at
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)...
13 more Caused by:
**org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 101 Caused by:
java.net.ConnectException: Connection refused (Connection refused)  at
java.net.PlainSocketImpl.socketConnect(Native Method)**     at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)   at
java.net.Socket.connect(Socket.java:589)    at
sun.net.NetworkClient.doConnect(NetworkClient.java:175)     at
sun.net.www.http.HttpClient.openServer(HttpClient.java:463)     at
sun.net.www.http.HttpClient.openServer(HttpClient.java:558)     at
sun.net.www.http.HttpClient.<init>(HttpClient.java:242)     at
sun.net.www.http.HttpClient.New(HttpClient.java:339)    at
sun.net.www.http.HttpClient.New(HttpClient.java:357     at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
    at
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
    at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
    at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
    at
sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
    at
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
    at
java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)

1 Ответ

1 голос
/ 16 января 2020

Вы настроили Kafka Connect для поиска реестра схемы, работающего в том же контейнере:

"value.converter.schema.registry.url": "http://localhost:8081",

, но это не так, следовательно:

java.net.ConnectException: Connection refused (Connection refused)

Так как вы ' Если вы уже указали конфигурацию реестра схемы в переменных среды Kafka Connect docker, то, если у вас действительно есть реестр реестра, работающий в контейнере, который можно преобразовать как schema-registry из контейнера Kafka Connect, вы можете просто удалить schema.registry.url строки конфигурации из фактической конфигурации Соединителя.

...