ElasticsearchSinkConnector Не удалось десериализовать данные в Avro - PullRequest
0 голосов
/ 04 мая 2018

Я создал простейшую конфигурацию коннектора раковины kafka, и я использую confluent 4.1.0:

{
  "connector.class": 
  "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "type.name": "test-type",
  "tasks.max": "1",
  "topics": "dialogs",
  "name": "elasticsearch-sink",
  "key.ignore": "true",
  "connection.url": "http://localhost:9200",
  "schema.ignore": "true"
}

и в теме я сохраняю сообщения в JSON

{ "topics": "resd"}

Но в результате я получаю ошибку:

Причина: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора -1 Вызывается: org.apache.kafka.common.errors.SerializationException: неизвестный магический байт!

Ответы [ 2 ]

0 голосов
/ 05 мая 2018

Как говорит cricket_007, вам нужно указать Connect использовать Jse deserialiser, если это тот формат, в котором находятся ваши данные. Добавьте это в конфигурацию вашего соединителя:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false"
0 голосов
/ 05 мая 2018

Эта ошибка возникает из-за того, что она пытается прочитать сообщения Avro, не закодированные в Confluent Schema Registry.

Если данные темы - Avro, необходимо использовать реестр схем.

В противном случае, если данные темы - JSON, вы запустили кластер соединений с AvroConverter для ваших ключей или значений в файле свойств, где вместо этого вам нужно использовать JsonConverter

...