Схема Kafka JsonConverter с обнуляемым полем и тема с косой чертой - PullRequest
0 голосов
/ 28 марта 2019

Мы пытаемся настроить исходный соединитель от mqtt до Landoop Kafka (после того, как нам понадобится приемник для InfluxDB). Полезные данные в формате JSON, и мы не можем это изменить. Некоторые поля обнуляются. Также мы используем «косую черту» для разделения подтем (например, `Test / 1, Test / 2 и т. Д.).

Мы пытались использовать JsonConverterWithSchemaEvolution, но это вызывает проблемы с косой чертой. В то же время использование JsonSimpleConverter, по-видимому, не поддерживает null, а использование автоматически сгенерированной схемы с первым преобразователем приводит к «несовместимой» ошибке схемы.

Что мы можем сделать?

Конфигурация источника данных MQTT:

{
 "name": "DataSource",
 "config": 
 {
   "connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
   "connect.mqtt.username": USER,
   "connect.mqtt.password": PASS,
   "tasks.max": "1",
   "connect.mqtt.kcql": "INSERT INTO TestJson SELECT * FROM Test/1 WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`",
   "connect.mqtt.service.quality": "2",
   "connect.mqtt.hosts": "tcp://mqtt-broker:1883"
 }
 }

Пример сообщения JSON:

{
   "TimeStamp":"24/07/2018 14:38:00.2650000",
   "unit":"U3",
   "Acc1":36.0,
   "PPR":null,
}

Автоматически сгенерированное значение схемы

{
   "type":"record",
   "name":"TestJson",
   "fields":[
      {
         "name":"TimeStamp",
         "type":["null","string"],
         "default":null
      },{
         "name":"unit",
         "type":["null","string"],
         "default":null
      },{
         "name":"Acc1",
         "type":["null","double"],
         "default":null
      },{
         "name":"PPR",
         "type":["null","string"],
         "default":null
      }
   ],
   "connect.name":"TestJson"
}

Исключение при использовании автоматически сгенерированной схемы и JsonSimpleConverter:

org.apache.kafka.connect.errors.DataException: TestJson
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:77)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:253)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:219)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"TestJson","fields":[{"name":"TimeStamp","type":"string"},{"name":"unit","type":"string"},{"name":"Acc1","type":"double"},{"name":"PPR","type":"string"}],"connect.name":"TestJson"}

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:203)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:320)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:312)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:115)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:154)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
    at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:116)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:253)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:219)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
...