Соединитель раковины neo4j - Avro - неизвестный маги c байт - PullRequest
0 голосов
/ 18 февраля 2020

Я столкнулся с приведенной ниже ошибкой. При добавлении коннектора приемника neo4j на платформе слияния.

Причина: org. apache .kafka.connect.errors.DataException: JsonConverter со схемами. enable требует полей "схема" и "полезная нагрузка" и может не содержать дополнительных полей. Если вы пытаетесь десериализовать простые данные JSON, установите schemas.enable = false в конфигурации вашего конвертера. в орг. apache .kafka.connect. json .JsonConverter.toConnectData (JsonConverter. java: 359)

. Проверьте следующую конфигурационную часть:

curl -X PUT http://localhost:8083/connectors/Neo4j-Sink-Connect-book/config -H "Content-Type: application/json" -d '{

    "topics": "pls.bookneo",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "neo4j.server.uri": "bolt://localhost:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "*****"

}'

Обновлены следующие изменения: curl -X PUT http://localhost: 8083 / connectors / Neo4j-Sink-Connect-book / config -H "Content-Type: application / json" -d ' {

"topics": "pulseTest.bookneo",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter.schema.registry.url": "http://localhost:8081", 
"value.converter.schema.registry.url": "http://localhost:8081", 
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"true",
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "pulse",
"neo4j.encryption.enabled": false,
"neo4j.topic.cdc.schema": "pulseTest.bookneo"

} '

теперь сталкиваюсь с проблемой:

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pulseTest.bookneo to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Теперь мой соединитель работает нормально, но узел в neo4j создан пустым.

curl -X PUT http://localhost: 8083 / разъемы / Neo4j-Sink-Connect-projectp / config -H "Тип содержимого: приложение / json" -d '{

"topics": "projectsp",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"errors.tolerance": "all",
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "pulse",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.projectsp": " MERGE (p:projects{projectname:coalesce(event.projectname,0),count:coalesce(event.count,0)})  "

} '

это мой зашифрованный запрос: "MERGE (p: projects {projectname: coalesce (event.projectname, 0), count: coalesce (event.count, 0)} ) "

Я приложил данные из топи c

Ответы [ 2 ]

1 голос
/ 18 февраля 2020

Вы также используете JsonConverter в key, поэтому необходимо указать также schemas.enable:

curl -X PUT http://localhost:8083/connectors/Neo4j-Sink-Connect-book/config -H "Content-Type: application/json" -d '{
    "topics": "pls.bookneo",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "neo4j.server.uri": "bolt://localhost:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "*****"

}'

Подробнее о преобразователях см. https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

0 голосов
/ 27 февраля 2020

Неизвестные волхвы c байт!

Ваш топи c не содержит данных Avro, созданных клиентом Confluent, поэтому при десериализаторе происходит сбой

Вы можете проверить то же самое с помощью консоли пользователя Kafka avro

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...