Ошибка десериализации Kafka Темы данных при извлечении из библиотеки Python Kafka - PullRequest
0 голосов
/ 19 декабря 2018

Я установил приемник Connector для Postgres к одному из узлов моего кластера Kakfka.Настройка выглядит следующим образом:

  • 3 Zookeepers
  • 3 Kafka Brokers
  • 3 Реестр схемы
  • 1 Kafka Connect

Я создал приемник, используя

curl -X POST -H "Content-Type: application/json" \
  --data '{
    "name": "nextiot-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:postgresql://db_host:5432/nextiot",
        "connection.user": "db_user",
        "connection.password": "db_pass",
        "auto.create": true,
        "auto.evolve": true,
        "topics": "nextiot"
        }
    }' http://10.0.1.70:8083/connectors

После вставки в контейнер реестра схемы я могу производить и принимать данные из команды kafka-avro-console-producer

Но когда я пытаюсьсделать отправку данных от клиента Клиент я получаю это:

{"name": "nextiot-sink", "connector": {"state": "RUNNING", "worker_id":"0.0.0.0:8083"‹,"tasks":[]"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Превышен допуск в обработчике ошибок \ n \ tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError (RetryWithToleranceOperator.java:178) \ n \ tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.ithtat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord (WorkerSinkTask.java:513) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages (WorkerSinkTask.java:490) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll (WorkerSinkTask.java:321) \ n \ tat org.apkafka.connect.runtime.WorkerSinkTask.iteration (WorkerSinkTask.java:225) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:193) \ n \ tat org.apache.kafconnect.runtime.WorkerTask.doRun (WorkerTask.java:175) \ n \ tat org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:219) \ n \ tat java.util.concurrent.Executors $RunnableAdapter.call (Executors.java:511) \ n \ tat java.util.concurrent.FutureTask.run (FutureTask.java:266) \ n \ tat java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.j):\ n \ tat java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) \ n \ tat java.lang.Thread.run (Thread.java:748) \ nПричиняется: org.apache.kafka.connect.errors.DataException: nextiot \ n \ tat io.confluent.connect.avro.AvroConverter.toConnectData (AvroConverter.java:98) \ n\ tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda $ convertAndTransformRecord $ 1 (WorkerSinkTask.java:513) \ n \ tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorj.) \ n \ tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError (RetryWithToleranceOperator.java:162) \ n \ t ... еще 13 \ nПричинено: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора -1 \ nПричиняется: org.apache.kafka.common.errors.SerializationException: Неизвестный магический байт! \ N "," id ": 0," worker_id ":" 0.0.0.0:8083"}]," type ":" sink "}

Это моя схема AVRO

{ "namespace": "example.avro", "type": "record", "name": "NEXTIOT", "fields": [ {"name": "deviceid", "type": "string"}, {"name": "longitude", "type": "float"}, {"name": "latitude", "type": "float"} ] }

Мой код Python для публикации данных:

import io
import random
import avro.schema
from avro.io import DatumWriter
from kafka import SimpleProducer
from kafka import KafkaClient

# To send messages synchronously
# KAFKA = KafkaClient('Broker URL:9092')
KAFKA = KafkaClient('BROKER URL')

PRODUCER = SimpleProducer(KAFKA)

# Kafka topic
TOPIC = "nextiot"

# Path to user.avsc avro schema
SCHEMA_PATH = "user.avsc"
SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())

for i in xrange(10):
    writer = DatumWriter(SCHEMA)
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer.write(*{"deviceid":"9098", "latitude":  90.34 , "longitude": 334.4}, encoder)
    raw_bytes = bytes_writer.getvalue()
    PRODUCER.send_messages(TOPIC, raw_bytes)

В Kafka Connect Connector возникает следующая ошибка:

{"name": "nextiot-sink", "connector": {"state": "RUNNING" "worker_id":" 0.0.0.0:8083"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectExcepion: Превышено допустимое отклонение в обработчике ошибок \ n \ tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError (RetryWithToleranceOperator.java:178) \ n \ tat org.apache.kafka.connect.RoleryWorWinterWords..execute (RetryWithToleranceOperator.java:104) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord (WorkerSinkTask.java:513) \ n \ tat org.apache.kafka.Tessink.vertconvert.un(WorkerSinkTask.java:490)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll (WorkerSinkTask.java:321) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration (.java: 225) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:193) \ n \ tatorg.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:175) \ n \ tat org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:219) \ n \ tat java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java:511) \ n \ tat java.util.concurrent.FutureTask.run (FutureTask.java:266) \ n \ tat java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) \ n \ tat java.lang.Thread.run (Thread.java:748) \ nПричинено:org.apache.kafka.connect.errors.DataException: nextiot \ n \ tat io.confluent.connect.avro.AvroConverter.toConnectData (AvroConverter.java:98) \ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda $ convertAndTransformRecord $ 1 (WorkerSinkTask.java:513) \ n \ tat org.apache.kafka.connect.runtime.errors.runtime.errors.RetryWithToleranceOperator.execAndHandleError (RetryWithToleranceOperator.java:162)\n\t ... еще 13 \ nПричинено: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора -1 \ nПричинено: org.apache.kafka.common.errors.SerializationException: Неизвестный волшебный байт! \ n "," id ": 0," worker_id ":" 0.0.0.0:8083" enj],"type":"sink" enj

1 Ответ

0 голосов
/ 19 декабря 2018

Я не так много сделал с различными клиентами Python, но эта магическая ошибка байта почти наверняка связана с тем, что отправляемая вами информация может быть действительной, но если вы хотите интегрироваться с реестром схемы, полезная нагрузка должна быть вдругой формат (дополнительная информация заголовка, задокументированная здесь https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html поиск формата проводника или магического байта).Лично я бы попытался использовать клиент Python Kafka от Confluent-- https://github.com/confluentinc/confluent-kafka-python - в нем есть примеры работы с Avro и реестром схемы.

...