Десериализация авро сообщения - PullRequest
2 голосов
/ 02 февраля 2020

Я развернул Кафку с здесь . Также я добавил в docker-compose.yml Postgres контейнер следующим образом:

postgres:
    image: postgres
    hostname: kafka-postgres
    container_name: kafka-postgres
    depends_on:
      - ksql-server
      - broker
      - schema-registry
      - connect
    ports:
      - 5432:5432

Создал topi c просмотров страниц.

Далее я создал DatagenConnector с настройками и запустил его.

{
  "name": "datagen-pageviews",
  "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "kafka.topic": "pageviews",
  "max.interval": "100",
  "iterations": "999999999",
  "quickstart": "pageviews"
} 

Насколько я вижу, коннектор определил схему для topi c:

{
  "type": "record",
  "name": "pageviews",
  "namespace": "ksql",
  "fields": [
    {
      "name": "viewtime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "pageid",
      "type": "string"
    }
  ],
  "connect.name": "ksql.pageviews"
} 

Мой следующий шаг - создать JdbcSinkConnector, который будет передавать данные из Kafka topi c до Postgres таблицы. Это сработало. Настройки коннектора:

{
  "name": "from-kafka-to-pg",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topics": [
    "pageviews"
  ],
  "connection.url": "jdbc:postgresql://kafka-postgres:5432/postgres",
  "connection.user": "postgres",
  "connection.password": "********",
  "auto.create": "true",
  "auto.evolve": "true"
}

Затем я сам пытаюсь отправить сообщения на этот топи c. Но произошла ошибка с ошибкой:

[2020-02-01 21: 16: 11,750] ОШИБКА Обнаружена ошибка в задаче to-pg-0. Выполнение этапа 'VALUE_CONVERTER' с классом 'io.confluent.connect.avro.AvroConverter', где используемой записью является {topic = 'viewviews', partition = 0, offset = 23834, timestamp = 1580591160374, timestampType = CreateTime}. (org. apache .kafka.connect.runtime.errors.LogReporter) org. apache .kafka.connect.errors.DataException: не удалось десериализовать данные для topi c просмотров страниц в Avro: в io.confluent.connect .avro.AvroConverter.toConnectData (AvroConverter. java: 110) в орг. apache .kafka.connect.runtime.WorkerSinkTask.lambda $ convertAndTransformRecord $ 1 (WorkerSinkTask. java: 487) в орг. kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry (RetryWithToleranceOperator. java: 128) at or. apache .kafka.connect.runtime.errors.RetryWithToleranceOperator.execute (RetryWithToleranceOperator. java: 104) в орг. org. apache .kafka.connect.runtime.WorkerSinkTask.convertMessages (WorkerSinkTask. java: 464) в org. apache .kafka.connect.runtime.WorkerSinkTask.poll (WorkerSinkTask. java : 320) в орг. apache .kafka.connect.runtime.WorkerSinkTask.iteration (WorkerSinkTask. java: 224) в орг. apache .kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask. java : 192) в орг. apache .kafka.connect.runtime.WorkerTask.doRun (WorkerTask. java: 177) в орг. apache .kafka.connect.runtime.WorkerTask.run (WorkerTask. java : 227) в java .util.concurrent.Executors $ RunnableAdapter.call (Executors. java: 511) в java .util.concurrent.FutureTask.run (FutureTask. java: 266) в java .util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor. java: 1149) в java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) в java .langTh run (Thread. java: 748) Причина: org. apache .kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора -1. Причина: org. apache .kafka.common.errors. Исключение SerializationException: неизвестный магический байт!

Так что метод send имеет значение. Вот как я это делаю (Python, confluent-kafka- python):

producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.poll(0)
producer.produce(topic, json.dumps({
   'viewtime': 123,
   'userid': 'user_1',
   'pageid': 'page_1'
}).encode('utf8'), on_delivery=kafka_delivery_report)
producer.flush()

Может быть, мне следует предоставить схему с сообщением (AvroProducer)?

Ответы [ 2 ]

1 голос
/ 02 февраля 2020

topi c ожидает сообщение типа Avro.

AvroProducer из confluent-kafka-python делает свое дело:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer


value_schema_str = """
{
   "namespace": "ksql",
   "name": "value",
   "type": "record",
   "fields" : [
     {
       "name" : "viewtime",
       "type" : "long"
     }, 
     {
       "name" : "userid",
       "type" : "string"
     }, 
     {
       "name" : "pageid",
       "type" : "string"
     }
   ]
}
"""

key_schema_str = """
{
   "namespace": "ksql",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "pageid",
       "type" : "string"
     }
   ]
}
"""

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"name": "Value"}
key = {"name": "Key"}


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


avroProducer = AvroProducer({
    'bootstrap.servers': 'mybroker,mybroker2',
    'on_delivery': delivery_report,
    'schema.registry.url': 'http://schema_registry_host:port'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

avroProducer.produce(topic='my_topic', value=value, key=key)
avroProducer.flush()
1 голос
/ 02 февраля 2020

Ваша проблема возникает из-за того, что вы пытаетесь использовать Avro converter для чтения данных из topi c, то есть , а не Avro .

Существует два возможных решения:

1. Переключите разъем приемника Kafka Connect для использования правильного преобразователя

Например, если вы потребляете JSON данных из топки Kafka c в приемник Kafka Connect:

...
value.converter=org.apache.kafka.connect.json.JsonConverter. 
value.converter.schemas.enable=true/false
...

value.converter.schemas.enable зависит от того, содержит ли сообщение схему.

2. Переключите формат восходящего потока на Avro

Чтобы DatagenConnector генерировал сообщения для Kafka, где формат значения сообщения Avro, установите параметры value.converter и value.converter.schema.registry.url:

...
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
...

Подробнее см. Kafka-connect-datagen docs .


Great статья о преобразователях Kafka Connect и сериализации.

...