Как соединитель Kafka mqtt может отправить тему mqtt в качестве ключа? - PullRequest
0 голосов
/ 19 ноября 2018

У меня работает брокер MQTT и брокер Kafka, я использовал kafka-разъем: https://github.com/Landoop/stream-reactor, со следующей конфигурацией:

name=Mqtt-Source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO test SELECT * FROM + WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` WITHKEY(id)
connect.mqtt.connection.clean=true
connect.mqtt.connection.timeout=1000
connect.mqtt.connection.keep.alive=1000
connect.mqtt.client.id=test_mqtt_connector
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://mqtt-broker:1883
connect.mqtt.service.quality=1

В kcql я определяюполе сообщения, которое kafka должен принять в качестве ключа, в любом случае использовать mqtt-topic в качестве ключа?Поэтому мне не нужно определять WITHKEY() в kcql.

1 Ответ

0 голосов
/ 19 ноября 2018

Я не знаю о KCQL от Landoop, но предполагая, что тема является частью значения сообщения, вы можете переместить ее на ключ следующим образом:

transforms=ReplaceKey,ExtractKey
transforms.ReplaceKey.type=org.apache.kafka.connect.transforms.ValueToKey
# change the field accordingly
transforms.ReplaceKey.fields=mqtt_topic
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
# make sure this is the same field as above
transforms.ExtractKey.field=mqtt_topic

Если нет, то вы можете вставить его статически

transforms=AddKey
transforms.AddKey.type=org.apache.kafka.connect.transforms.InsertField$Key
# The exclamation makes this a required field
transforms.AddKey.static.field=mqtt_topic!
transforms.AddKey.static.value="<<your topic name>>"

Однако приведенное выше может не работать с SELECT * FROM +, где вы выбираете из всех тем MQTT

...