Как добавить новую тему Kafka для каждого нового производителя из IoT Hub? - PullRequest
3 голосов
/ 16 апреля 2019

Я работаю над облачным решением Azure.Я использую IoT Hub, подключенный к Kafka, для обработки данных, поступающих с различных устройств IoT .Я сталкиваюсь с тем, что все данные, поступающие с нескольких устройств, хранятся в одной теме.Однако я хочу обработать данные каждого устройства, подключенного к концентратору IoT, к определенной теме в Kafka (каждое устройство имеет свою собственную тему Kafka)

Toketi «Соединитель источника Kafka Connect для концентратора IoT Azure» предоставляетследующий файл конфигурации (пограничный узел)

connector.class=com.microsoft.azure.iot.kafka.connect.source.IotHubSourceConnector
name=AzureIotHubConnector
tasks.max=1
Kafka.Topic=IotTopic
IotHub.EventHubCompatibleName=iothub-toketi
IotHub.EventHubCompatibleEndpoint=sb://iothub-001.servicebus.windows.net/
IotHub.AccessKeyName=service
IotHub.AccessKeyValue=4KsdfiB9J899a+N3iwerjKwzeqbZUj1K//KKj1ye9i3=
IotHub.ConsumerGroup=$Default
IotHub.Partitions=4
IotHub.StartTime=2016-11-28T00:00:00Z
IotHub.Offsets=
BatchSize=100
ReceiveTimeout=60

Он работает для одной темы, чтобы хранить все данные с нескольких устройств, но я ожидаю сделать изоляцию между данными, поступающими с устройств

Любые решения или идеи!!

Спасибо

1 Ответ

2 голосов
/ 16 апреля 2019

Одним из решений является использование SMT (Single Message Transformation).

Поток соединителя источника содержит несколько шагов:

  • Опрос данных из внешнего источника как List<SourceRecord>
  • Преобразование каждого сообщения (SourceRecord) с использованием определенного SMT (может быть пропущено, если преобразование не определено
  • Преобразовать ключ и значение SourceRecord в Массивы байтов.
  • Отправить сообщение через KafkaProducer Kafka

Kafka Connect определяет, в какую тему отправлять сообщение, на основе поля SourceRecord::topic. Используя SMT, вы можете установить правильное значение темы.

Pure Apache Kafka Connect не имеет такой трансформации. Если вы используете Confluent Platform, доступны некоторые дополнительные преобразования. Для извлечения названия темы вы можете использовать ExtractTopic . У него есть свойство, которое называется field

Более подробную информацию о концепции SMT можно найти на веб-странице Apache Kafka или веб-странице Confluent

...