Как проверить, была ли отправлена ​​новая запись в данный период времени, используя kafka и faust - PullRequest
0 голосов
/ 25 февраля 2019

Я использую тестовую установку, включая платформу (докер), и обрабатываю записи со следующей информацией: идентификатор датчика, метка времени, значение.Используя robinhood faust (похожий на Kafka Streams, но в python), я пытаюсь сделать следующее:

Когда есть новая запись для датчика, должен быть «таймер» и если нет новой записи для этого датчика-ID получен в течение заданного времени, должна быть ошибка, указывающая на возможную неисправность этого датчика / машины.

Я пытался использовать time.sleep(), но происходит то, что он просто спит в течение 10 секунд, а затем обрабатывает следующую запись.

Возможно ли сделать что-то подобное с настройкойЯ использую?

1 Ответ

0 голосов
/ 26 февраля 2019

Вы можете использовать Поворот окна KSQL :

Создать поток сенсорной информации;

CREATE STREAM sensorinformation \
  (sensorid VARCHAR, \
   sensortimestamp BIGINT, \
   value VARCHAR) \
 WITH (KAFKA_TOPIC='sensorinformationtopic', \
       VALUE_FORMAT='DELIMITED', \
       KEY='sensorid', \
       TIMESTAMP='sensortimestamp');

И, наконец, создайте таблицу, содержащую неисправные датчики, которые появляются только один раз за 10 секунд:

CREATE TABLE faulty_sensors AS \
  SELECT sensorid, \
         count(*) \
  FROM sensorinformation \
  WINDOW TUMBLING (SIZE 10 SECONDS) \
  GROUP BY sensorid \
  HAVING count(*) = 1;
...