Отправка из Логаста sh в Кафку с Авро - PullRequest
1 голос
/ 22 апреля 2020

Я пытаюсь отправить данные из logsta sh в kafka с использованием схемы avro.

Мой вывод logsta sh выглядит следующим образом:

kafka{
  codec => avro {
    schema_uri => "/tmp/avro/hadoop.avsc"
  }
  topic_id => "hadoop_log_processed"
}

Мой файл схемы выглядит следующим образом :

{"type": "record",
 "name": "hadoop_schema",
 "fields": [
     {"name": "loglevel", "type": "string"},
     {"name": "error_msg",  "type": "string"},
     {"name": "syslog", "type": ["string", "null"]},
     {"name": "javaclass", "type": ["string", "null"]}
 ]
}

Вывод kafka-console-consumer:

CElORk+gAURvd24gdG8gdGhlIGxhc3QgbWVyZ2UtcGCzcywgd2l0aCA3IHNlZ21lbnRzIGxlZnQgb2YgdG90YWwgc256ZTogMjI4NDI0NDM5IGJ5dGVzAAxbbWFpbl0APm9yZy5hcGFjaGUuaGFkb29wLm1hcHJlZC5NZXJnZXI=
CElORk9kVGFzayAnYXR0ZW1wdF8xNDQ1JDg3NDkxNDQ1XzAwMDFfbV8wMDAwMDRfMCcgZG9uZS4ADFttYWluXQA6t3JnLmFwYWNoZS5oYWRvb6AubWFwcmVkLlRhc2s=
CElORk9kVGFzayAnYXR0ZW1wdF8xNDQ1JDg3NDkxNDQ1XzAwMDFfbV8wMDAwMDRfMCcgZG9uZS4ADFttYWluXQA6t3JnLmFwYWNoZS5oYWRvb6AubWFwcmVkLlRhc2s=
CElORk9OVGFza0hlYAJ0YmVhdEhhbmRsZXIgdGhyZWFkIGludGVycnVwdGVkAERbVGFza0hlYXJdYmVhdEhhbmRsZXIgUGluZ0NoZWNrZXJdAG5vcmcuYVBhY2hlLmhhZG9vcC5tYXByZWR1Y2UudjIuYXBwLlRhc2tIZWFydGJ3YXRIYW5kbGVy

В моем соединителе также появляется следующая ошибка:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic hadoop_log_processed to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Я знаю, что кодирую данные на сайте logsta sh. Нужно ли декодировать сообщения во время ввода в kafka или я могу декодировать / десериализовать данные в конфигурации коннектора?

Есть ли способ отключить кодировку на сайте logsta sh? Я читал об опциях base64_encoding, но, похоже, у него нет опции.

1 Ответ

2 голосов
/ 22 апреля 2020

Проблема, с которой вы здесь сталкиваетесь, заключается в том, что код Avro sh Logsta *1029* не сериализует данные в форму Avro, которую ожидает Confluent Schema Registry Avro deserialiser.

В то время как Logsta sh принимает AVS c и кодирует данные в двоичную форму, основываясь на этом, сериализатор реестра Confluent Schema вместо этого сохраняет и получает схему непосредственно из реестра (не avsc файла).

Итак, когда вы получаете Failed to deserialize data … SerializationException: Unknown magic byte!, это десериализатор Avro, который говорит, что он не распознает данные как Avro, которые были сериализованы с использованием сериализатора реестра Schema.

У меня был быстрый Google, и я нашел этот код c, который выглядит так, как будто он поддерживает реестр схем (и, следовательно, Kafka Connect и любые другие потребительские десериализации данных Avro таким образом).

Либо запишите свои данные как JSON в Kafka и используйте org.apache.kafka.connect.json.JsonConverter в Kafka Connect, чтобы прочитать их из topi c.

Ссылка:

...