Невозможно связать исходный разъем Kafka MQTT с приемным разъемом InfluxDB - PullRequest
0 голосов
/ 24 апреля 2018

Мы пытаемся связать разъем источника MQTT с разъемом приемника InfluxDB. Прямо сейчас первый работает нормально, но последний дает исключение ниже:

org.apache.kafka.connect.errors.ConnectException: выход WorkerSinkTask из-за неисправимого исключения. в org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages (WorkerSinkTask.java:484) в org.apache.kafka.connect.runtime.WorkerSinkTask.poll (WorkerSinkTask.java:265) в org.apache.kafka.connect.runtime.WorkerSinkTask.iteration (WorkerSinkTask.java:182) в org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:150) в org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:146) в org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:190) в java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java:511) в java.util.concurrent.FutureTask.run (FutureTask.java:266) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) at java.lang.Thread.run (Thread.java:748)

Это файл конфигурации InfluxDB

connector.class=com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector
connect.influx.url=http://localhost:8086
connect.influx.db=iot
tasks.max=1
topics=simMetrics
connect.influx.kcql=INSERT INTO sensorMetrics SELECT * FROM simMetrics WITHTIMESTAMP sys_time()
name=influxdb-sink
connect.influx.username=""

Это структура сообщения:

{"отметка времени": 1524572345184, "раздел": 0, "ключ": {"тема": "machine / sensor / mytopic / test", "id": "1"}, "offset": 0, "topic": "simMetrics", "value": {"metrics": {"buzzer": 0, "led": 0, "water": false, "buzzer_timestamp": 1524571762798, "temperature_timestamp": 1524571762816, «water_timestamp»: 1524571762835, «fan»: 0, «light»: 500, "температура": 27,371554588194957, "assetName": "SIMopcua", "fan_timestamp": 1524571762791, "light_timestamp": 1524571762808, "led_timestamp": 1524571762827}}}

Конфигурация соединителя источника MQTT:

connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
name=mqtt-source
connect.mqtt.kcql=INSERT INTO simMetrics SELECT * FROM machine/sensor/mytopic/test WITHCONVERTER=com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter
connect.mqtt.service.quality=1
connect.mqtt.hosts=tcp://192.168.208.203:1884

UPDATE
Мы обнаружили, что проблема заключается в формате значений температуры. Поскольку мы не настроили типы полей, InfluxDB воспринимает значения температуры как двойные. Все значения, имеющие десятичный разделитель, сохраняются правильно, проблема возникает, когда Kafka отправляет значения без десятичной части, пропуская десятичный разделитель. Как мы можем исправить эту проблему?
PS: Фактический обходной путь - добавить 0,00000001 ко всем входящим температурам.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...