Вы могли бы начать с изучения KSQL ?Не зная вашей схемы, вот наивный запрос к теоретической теме Кафки с живыми данными RSSI:
CREATE STREAM rssi_data
(device_id VARCHAR,
battery_level INT,
SIGNAL FLOAT)
WITH (TOPIC='rssi_data', VALUE_FORMAT='json', KEY='device_id');
, а затем запрос к ней:
CREATE STREAM low_battery_devices AS SELECT * FROM rssi_data WHERE battery_level < 20;
Это создаст новую тему Кафки под названием low_bettery_devices
, который будет содержать события для устройств, батарея которых ниже.Затем вы, конечно, можете использовать эту тему с другим запросом KSQL, или программой R, или какой-либо другой нисходящей Kafka-совместимой системой.