Я пытаюсь настроить kafka-разъем с помощью пользовательского преобразователя значений .
Я использую kafka для передачи сериализованных объектов Thrift.
Я хочу установить kafka-коннектор, который десериализует thrift-сообщения, конвертирует их в json и отправляет вasticsearch.
Метод org.apache.kafka.connect.storage.Converter#toConnectData
возвращает SchemaAndValue
, для чего нужно org.apache.kafka.connect.data.Schema
.
Как мне получить эту схему для моего json?
Что я пробовал такfar:
Я пытался расширить org.apache.kafka.connect.json.JsonConverter
, но у него есть собственная схема откуда-то.
Я пытался сгенерировать схему, используя эту библиотеку: https://github.com/reinert/JJSchema,, но JsonConverter
кажется, что он имеет свой собственный формат: он ожидает map
вместо object
и т. д.
См .: https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L408
Несмотря на то, что я отключил схему ("value.converter.schemas.enable":"false"
) в моей конфигурации,разъем по-прежнему падает и жалуется на схему. Откуда взялась эта схема?Как они его генерируют?
Я собираюсь написать метод, который рекурсивно переименовывает все «неправильные» вещи в схеме json, но это слишком неудобно. Есть ли правильный подход?
UPD: моя конфигурация
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "mytopic",
"key.ignore": "true",
"connection.url": "https://my-elastic:443",
"type.name": "event",
"elasticsearch.index.prefix" : "kafka",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "com.example.ThriftToJsonDeserializer",
"value.converter.schemas.enable":"false"
}