Kafka Connect, получить схему Json для JsonConverter - PullRequest
0 голосов
/ 05 февраля 2019

Я пытаюсь настроить kafka-разъем с помощью пользовательского преобразователя значений .

Я использую kafka для передачи сериализованных объектов Thrift.
Я хочу установить kafka-коннектор, который десериализует thrift-сообщения, конвертирует их в json и отправляет вasticsearch.

Метод org.apache.kafka.connect.storage.Converter#toConnectData возвращает SchemaAndValue, для чего нужно org.apache.kafka.connect.data.Schema.
Как мне получить эту схему для моего json?

Что я пробовал такfar:

Я пытался расширить org.apache.kafka.connect.json.JsonConverter, но у него есть собственная схема откуда-то.

Я пытался сгенерировать схему, используя эту библиотеку: https://github.com/reinert/JJSchema,, но JsonConverter кажется, что он имеет свой собственный формат: он ожидает map вместо object и т. д.
См .: https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L408

Несмотря на то, что я отключил схему ("value.converter.schemas.enable":"false") в моей конфигурации,разъем по-прежнему падает и жалуется на схему. Откуда взялась эта схема?Как они его генерируют?

Я собираюсь написать метод, который рекурсивно переименовывает все «неправильные» вещи в схеме json, но это слишком неудобно. Есть ли правильный подход?

UPD: моя конфигурация

{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "mytopic",
    "key.ignore": "true",
    "connection.url": "https://my-elastic:443",
    "type.name": "event",
    "elasticsearch.index.prefix" : "kafka",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter" : "com.example.ThriftToJsonDeserializer",
    "value.converter.schemas.enable":"false"
}

1 Ответ

0 голосов
/ 08 февраля 2019

Проблема заключается в том, что Elasticsearch Connector пытается вывести сопоставление дляasticsearch на основе схемы сообщений.Схема сообщения создается Converter и изменяется Transforms.Если вы установите value.converter.schemas.enable на false, ваши схемы записей будут нулевыми.

Необходимо установить для schema.ignore значение true, и Elasticsearch Connector не будет выводить схему.

...