Мы пытаемся настроить исходный соединитель от mqtt до Landoop Kafka (после того, как нам понадобится приемник для InfluxDB).
Полезные данные в формате JSON, и мы не можем это изменить.
Некоторые поля обнуляются.
Также мы используем «косую черту» для разделения подтем (например, `Test / 1, Test / 2 и т. Д.).
Мы пытались использовать JsonConverterWithSchemaEvolution
, но это вызывает проблемы с косой чертой.
В то же время использование JsonSimpleConverter
, по-видимому, не поддерживает null
, а использование автоматически сгенерированной схемы с первым преобразователем приводит к «несовместимой» ошибке схемы.
Что мы можем сделать?
Конфигурация источника данных MQTT:
{
"name": "DataSource",
"config":
{
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
"connect.mqtt.username": USER,
"connect.mqtt.password": PASS,
"tasks.max": "1",
"connect.mqtt.kcql": "INSERT INTO TestJson SELECT * FROM Test/1 WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`",
"connect.mqtt.service.quality": "2",
"connect.mqtt.hosts": "tcp://mqtt-broker:1883"
}
}
Пример сообщения JSON:
{
"TimeStamp":"24/07/2018 14:38:00.2650000",
"unit":"U3",
"Acc1":36.0,
"PPR":null,
}
Автоматически сгенерированное значение схемы
{
"type":"record",
"name":"TestJson",
"fields":[
{
"name":"TimeStamp",
"type":["null","string"],
"default":null
},{
"name":"unit",
"type":["null","string"],
"default":null
},{
"name":"Acc1",
"type":["null","double"],
"default":null
},{
"name":"PPR",
"type":["null","string"],
"default":null
}
],
"connect.name":"TestJson"
}
Исключение при использовании автоматически сгенерированной схемы и JsonSimpleConverter
:
org.apache.kafka.connect.errors.DataException: TestJson
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:77)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:253)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"TestJson","fields":[{"name":"TimeStamp","type":"string"},{"name":"unit","type":"string"},{"name":"Acc1","type":"double"},{"name":"PPR","type":"string"}],"connect.name":"TestJson"}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:203)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:320)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:312)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:115)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:154)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:116)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:253)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)