Я выполнил работу с Hazelcast Jet, который преобразует поток измерений IoT в поток сигналов тревоги.
Таким образом, когда уровень влажности одного датчика превышает пороговое значение, возникает аварийный сигнал. Когда он снова падает ниже порога, сигнал тревоги сбрасывается. Может быть до 3 уровней порогов (серьезность).
В настоящее время у меня возникают проблемы при запуске задания. Он удалит все буферизованные события из моего источника RabbitMQ. Итак, дальние события упорядочены, потому что локальный параллелизм один (давайте предположим, что здесь один кластер) Но мы события отправляются в пул кооперативных потоков, нет гарантии на заказ. Могу ли я поручить Jet обработать все события с одним и тем же идентификатором датчика по порядку?
Вот текущее определение моего конвейера:
StreamStage<Notification> ss = l
.drawFrom(
Sources.<SimpleEntry<String, String>> streamFromProcessor("rabbitmq", ReadRabbitMQP.readRabbitMQ()))
.map(e -> makeMeasurement(e))
.flatMap(e -> checkThresholds(e))
.flatMap(e -> checkNotification(e));
ss.drainTo(Sinks.logger());
checkNotification сравнивает серьезность события с последней серьезностью для этого датчика. Вот почему порядок важен.
Я попытался реализовать решение, предложенное Гоханом Онером:
Я изменил исходный код для вывода объектов SimpleMeasurement. Таким образом, я могу добавить метку времени сразу после источника.
StreamStage<Notification> ss = l
.drawFrom(Sources.<SimpleEntry<Integer, SimpleMeasurement>> streamFromProcessor("rabbitmq",
ReadRabbitMQP.readRabbitMQ(mGroupNames, mLocalParallelism)))
.addTimestamps(e -> e.getValue().getTimestamp().toEpochMilli(), 1000)
.flatMap(e -> checkThresholds(e))
.groupingKey(e -> e.getSensorId())
.window(WindowDefinition.tumbling(1))
.aggregate(AggregateOperations.sorting(DistributedComparator.comparing(e -> e.getPeakTime())))
.flatMap(e -> checkNotification(e));
ss.drainTo(Sinks.logger());
С этим кодом события по-прежнему не обрабатываются для того же идентификатора датчика. Более того, с момента считывания события из источника до 20 секунд происходит задержка 20 секунд.