Я не знаю о KCQL от Landoop, но предполагая, что тема является частью значения сообщения, вы можете переместить ее на ключ следующим образом:
transforms=ReplaceKey,ExtractKey
transforms.ReplaceKey.type=org.apache.kafka.connect.transforms.ValueToKey
# change the field accordingly
transforms.ReplaceKey.fields=mqtt_topic
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
# make sure this is the same field as above
transforms.ExtractKey.field=mqtt_topic
Если нет, то вы можете вставить его статически
transforms=AddKey
transforms.AddKey.type=org.apache.kafka.connect.transforms.InsertField$Key
# The exclamation makes this a required field
transforms.AddKey.static.field=mqtt_topic!
transforms.AddKey.static.value="<<your topic name>>"
Однако приведенное выше может не работать с SELECT * FROM +
, где вы выбираете из всех тем MQTT