Синхронизировать данные из нескольких источников данных - PullRequest
8 голосов
/ 27 мая 2019

Наша команда пытается создать систему прогнозного обслуживания, задача которой состоит в том, чтобы посмотреть на набор событий и предсказать, отражают ли эти события набор известных аномалий.

Мы находимся на этапе проектирования итекущий дизайн системы выглядит следующим образом:

  • События могут происходить в нескольких источниках системы IoT (таких как облачная платформа, пограничные устройства или любые промежуточные платформы)
  • Событиявыталкивается источниками данных в систему очередей сообщений (в настоящее время мы выбрали Apache Kafka).
  • Каждый источник данных имеет свою очередь (тема Кафки).
  • Из очередей данныепотребляется несколькими механизмами логического вывода (которые на самом деле являются нейронными сетями).
  • В зависимости от набора функций механизм вывода будет подписываться на несколько тем Kafka и передавать данные из этих тем для непрерывного вывода вывода.
  • Общая архитектура следует принципу единой ответственности, означающему, чтокаждый компонент будет отделен друг от друга и будет работать в отдельном контейнере Docker.

Проблема:

Чтобы классифицировать набор событий как аномалию, события должны происходитьв том же временном окне.например, есть три источника данных, которые помещают свои соответствующие события в темы Кафки, но по какой-то причине данные не синхронизируются.Таким образом, один из механизмов вывода извлекает последние записи из каждой темы kafka, но соответствующие события в извлеченных данных не принадлежат одному и тому же временному окну (скажем, 1 час).Это приведет к неверным прогнозам из-за несинхронизированных данных.

Вопрос

Нам нужно выяснить, как мы можем убедиться, что данные из всех трех источников расположены в порядкетак что когда механизм логического вывода запрашивает записи (скажем, последние 100 записей) из нескольких тем kakfa, соответствующие записи в каждой теме принадлежат одному и тому же временному окну?

Ответы [ 4 ]

1 голос
/ 06 июня 2019

Следующие рекомендации должны максимизировать успех синхронизации событий для проблемы обнаружения аномалий с использованием данных временных рядов.

  1. Использовать сетевой синхронизатор времени на всех узлах производителя / потребителя
  2. Использовать сердцебиениесообщение от производителей каждые х единиц времени с фиксированным временем начала.Например: сообщения отправляются каждые две минуты в начале минуты.
  3. Построение предикторов для задержки сообщения производителя.используйте сообщения сердцебиения, чтобы вычислить это.

С этими примитивами мы сможем выровнять события временных рядов, учитывая смещения времени из-за сетевых задержек.

В механизме логического выводаКроме того, разверните окна на уровне каждого производителя, чтобы синхронизировать события между производителями.

1 голос
/ 31 мая 2019

Чтобы справиться с этим сценарием, источники данных должны предоставить потребителю некоторый механизм, чтобы понять, что все соответствующие данные получены. Самое простое решение - опубликовать пакет из источника данных с идентификатором пакета (Guid) некоторой формы. Затем потребители могут подождать, пока не появится идентификатор следующей партии, отмечающий конец предыдущей партии. Этот подход предполагает, что источники не будут пропускать пакет, в противном случае они будут постоянно выровнены. Не существует алгоритма для обнаружения этого, но у вас могут быть некоторые поля в данных, которые показывают разрывность и позволяют выровнять данные.

Более слабая версия этого подхода состоит в том, чтобы либо просто подождать х-секунд и предположить, что все источники преуспели за это много времени, либо посмотреть на какую-либо форму временных меток (логических или настенных часов), чтобы обнаружить, что источник перешел на в следующем временном окне неявно отображается завершение последнего окна.

1 голос
/ 05 июня 2019

Я бы предложил KSQL , который представляет собой потоковый движок SQL, который позволяет обрабатывать данные в реальном времени с использованием Apache Kafka.Он также обеспечивает хорошую функциональность для оконного агрегирования и т. Д.

Существует три способа определения Windows в KSQL:

скачкообразных окон, падающих окон и окон сеансов,Прыгающие и падающие окна являются временными окнами, потому что они определяются фиксированной продолжительностью, которую вы указываете.Окна сеансов имеют динамический размер на основе входящих данных и определяются периодами активности, разделенными промежутками бездействия.

В вашем контексте вы можете использовать KSQL для запроса и агрегирования интересующих тем, используя Оконные соединения .Например,

SELECT t1.id, ...
  FROM topic_1 t1
  INNER JOIN topic_2 t2
    WITHIN 1 HOURS
    ON t1.id = t2.id;
1 голос
/ 30 мая 2019

Некоторые предложения -

  1. Обработка задержки на стороне производителя - Убедитесь, что все три производителя всегда отправляют данные синхронно с темами Kafka, используя batch.size и linger.ms.например.если для linger.ms задано значение 1000, все сообщения будут отправлены в Kafka в течение 1 секунды.

  2. Обработка задержки на стороне потребителя - с учетом любого механизма потоковой передачи на стороне потребителя (будь тоKafka-stream, spark-stream, Flink) обеспечивает функциональность окон для объединения / агрегирования потоковых данных на основе ключей с учетом отложенной оконной функции.

Проверьте это - Flink windows для справки о том, каквыберите правильный тип окна ссылка

...