Как убедиться, что кортежи с одинаковым ключом обрабатываются по порядку - PullRequest
0 голосов
/ 08 января 2019

Я выполнил работу с Hazelcast Jet, который преобразует поток измерений IoT в поток сигналов тревоги.

Таким образом, когда уровень влажности одного датчика превышает пороговое значение, возникает аварийный сигнал. Когда он снова падает ниже порога, сигнал тревоги сбрасывается. Может быть до 3 уровней порогов (серьезность).

В настоящее время у меня возникают проблемы при запуске задания. Он удалит все буферизованные события из моего источника RabbitMQ. Итак, дальние события упорядочены, потому что локальный параллелизм один (давайте предположим, что здесь один кластер) Но мы события отправляются в пул кооперативных потоков, нет гарантии на заказ. Могу ли я поручить Jet обработать все события с одним и тем же идентификатором датчика по порядку?

Вот текущее определение моего конвейера:

  StreamStage<Notification> ss = l
        .drawFrom(
              Sources.<SimpleEntry<String, String>> streamFromProcessor("rabbitmq", ReadRabbitMQP.readRabbitMQ()))
        .map(e -> makeMeasurement(e))
        .flatMap(e -> checkThresholds(e))
        .flatMap(e -> checkNotification(e));

  ss.drainTo(Sinks.logger());  

checkNotification сравнивает серьезность события с последней серьезностью для этого датчика. Вот почему порядок важен.


Я попытался реализовать решение, предложенное Гоханом Онером: Я изменил исходный код для вывода объектов SimpleMeasurement. Таким образом, я могу добавить метку времени сразу после источника.

StreamStage<Notification> ss = l
      .drawFrom(Sources.<SimpleEntry<Integer, SimpleMeasurement>> streamFromProcessor("rabbitmq",
                  ReadRabbitMQP.readRabbitMQ(mGroupNames, mLocalParallelism)))
      .addTimestamps(e -> e.getValue().getTimestamp().toEpochMilli(), 1000)
      .flatMap(e -> checkThresholds(e))
      .groupingKey(e -> e.getSensorId())
      .window(WindowDefinition.tumbling(1))
      .aggregate(AggregateOperations.sorting(DistributedComparator.comparing(e -> e.getPeakTime())))
      .flatMap(e -> checkNotification(e));

ss.drainTo(Sinks.logger());

С этим кодом события по-прежнему не обрабатываются для того же идентификатора датчика. Более того, с момента считывания события из источника до 20 секунд происходит задержка 20 секунд.

1 Ответ

0 голосов
/ 08 января 2019

@ PeeWee2201, поскольку это распределенный поток, нет гарантированного заказа. Но если вы хотите обрабатывать уведомления от тех же датчиков по порядку, то вам необходимо:

  • добавить метку времени к событиям
  • группировка по идентификатору датчика
  • определить окно, 10 с, 30 с и т. Д., Чтобы события могли быть агрегированы в этом окне
  • Сортировка всех событий на основе любого свойства в одном окне

Итак, работа должна выглядеть так:

  StreamStage<Notification> ss = l
        .drawFrom(
              Sources.<SimpleEntry<String, String>> streamFromProcessor("rabbitmq", ReadRabbitMQP.readRabbitMQ()))
        .addTimestamps(...., ...)
        .groupingKey(....)
        .window(WindowDefinition.tumbling(....))
        .aggregate(AggregateOperations.sorting(....))

Если makeMeasurement(e) - это шаг, который преобразует данные и может выполняться параллельно, вы можете добавить его перед группировкой.

После этого у вас будет список объектов для метода checkThresholds: все сообщения в окне для одного и того же сенсорного идентификатора упорядочены по времени прибытия или в любом порядке сортировки, который вы использовали.

Я верю, что это поможет решить вашу проблему.

...