Я установил приемник Connector для Postgres к одному из узлов моего кластера Kakfka.Настройка выглядит следующим образом:
- 3 Zookeepers
- 3 Kafka Brokers
- 3 Реестр схемы
- 1 Kafka Connect
Я создал приемник, используя
curl -X POST -H "Content-Type: application/json" \
--data '{
"name": "nextiot-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://db_host:5432/nextiot",
"connection.user": "db_user",
"connection.password": "db_pass",
"auto.create": true,
"auto.evolve": true,
"topics": "nextiot"
}
}' http://10.0.1.70:8083/connectors
После вставки в контейнер реестра схемы я могу производить и принимать данные из команды kafka-avro-console-producer
Но когда я пытаюсьсделать отправку данных от клиента Клиент я получаю это:
{"name": "nextiot-sink", "connector": {"state": "RUNNING", "worker_id":"0.0.0.0:8083"‹,"tasks":[]"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Превышен допуск в обработчике ошибок \ n \ tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError (RetryWithToleranceOperator.java:178) \ n \ tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.ithtat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord (WorkerSinkTask.java:513) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages (WorkerSinkTask.java:490) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll (WorkerSinkTask.java:321) \ n \ tat org.apkafka.connect.runtime.WorkerSinkTask.iteration (WorkerSinkTask.java:225) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:193) \ n \ tat org.apache.kafconnect.runtime.WorkerTask.doRun (WorkerTask.java:175) \ n \ tat org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:219) \ n \ tat java.util.concurrent.Executors $RunnableAdapter.call (Executors.java:511) \ n \ tat java.util.concurrent.FutureTask.run (FutureTask.java:266) \ n \ tat java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.j):\ n \ tat java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) \ n \ tat java.lang.Thread.run (Thread.java:748) \ nПричиняется: org.apache.kafka.connect.errors.DataException: nextiot \ n \ tat io.confluent.connect.avro.AvroConverter.toConnectData (AvroConverter.java:98) \ n\ tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda $ convertAndTransformRecord $ 1 (WorkerSinkTask.java:513) \ n \ tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorj.) \ n \ tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError (RetryWithToleranceOperator.java:162) \ n \ t ... еще 13 \ nПричинено: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора -1 \ nПричиняется: org.apache.kafka.common.errors.SerializationException: Неизвестный магический байт! \ N "," id ": 0," worker_id ":" 0.0.0.0:8083"}]," type ":" sink "}
Это моя схема AVRO
{
"namespace": "example.avro",
"type": "record",
"name": "NEXTIOT",
"fields": [
{"name": "deviceid", "type": "string"},
{"name": "longitude", "type": "float"},
{"name": "latitude", "type": "float"}
]
}
Мой код Python для публикации данных:
import io
import random
import avro.schema
from avro.io import DatumWriter
from kafka import SimpleProducer
from kafka import KafkaClient
# To send messages synchronously
# KAFKA = KafkaClient('Broker URL:9092')
KAFKA = KafkaClient('BROKER URL')
PRODUCER = SimpleProducer(KAFKA)
# Kafka topic
TOPIC = "nextiot"
# Path to user.avsc avro schema
SCHEMA_PATH = "user.avsc"
SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())
for i in xrange(10):
writer = DatumWriter(SCHEMA)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write(*{"deviceid":"9098", "latitude": 90.34 , "longitude": 334.4}, encoder)
raw_bytes = bytes_writer.getvalue()
PRODUCER.send_messages(TOPIC, raw_bytes)
В Kafka Connect Connector возникает следующая ошибка:
{"name": "nextiot-sink", "connector": {"state": "RUNNING" "worker_id":" 0.0.0.0:8083"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectExcepion: Превышено допустимое отклонение в обработчике ошибок \ n \ tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError (RetryWithToleranceOperator.java:178) \ n \ tat org.apache.kafka.connect.RoleryWorWinterWords..execute (RetryWithToleranceOperator.java:104) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord (WorkerSinkTask.java:513) \ n \ tat org.apache.kafka.Tessink.vertconvert.un(WorkerSinkTask.java:490)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll (WorkerSinkTask.java:321) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration (.java: 225) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:193) \ n \ tatorg.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:175) \ n \ tat org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:219) \ n \ tat java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java:511) \ n \ tat java.util.concurrent.FutureTask.run (FutureTask.java:266) \ n \ tat java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) \ n \ tat java.lang.Thread.run (Thread.java:748) \ nПричинено:org.apache.kafka.connect.errors.DataException: nextiot \ n \ tat io.confluent.connect.avro.AvroConverter.toConnectData (AvroConverter.java:98) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda $ convertAndTransformRecord $ 1 (WorkerSinkTask.java:513) \ n \ tat org.apache.kafka.connect.runtime.errors.runtime.errors.RetryWithToleranceOperator.execAndHandleError (RetryWithToleranceOperator.java:162)\n\t ... еще 13 \ nПричинено: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора -1 \ nПричинено: org.apache.kafka.common.errors.SerializationException: Неизвестный волшебный байт! \ n "," id ": 0," worker_id ":" 0.0.0.0:8083" enj],"type":"sink" enj