Я перерабатываю конвейер прогнозирования в реальном времени для потоковой передачи данных датчика IoT. Конвейер принимает образцы данных датчика, структурированные как (sensor_id, timestamp, sample_index, value)
, поскольку они создаются в исходной системе, сохраняют их локально и запускают пакетные задания pyspark для алгоритмов обучения и прогнозирования.
В настоящее время данные датчика сохраняются в локальные файлы на диске с одним файлом на датчик и в HDFS для потоковой передачи искры. Задание потоковой передачи собирает каждую микропакет, вычисляет, сколько образцов поступило для каждого датчика, и решает, в каких датчиках накоплено достаточно новых данных, чтобы сделать новый прогноз. Затем он сопоставляет каждую строку датчика в СДР с методом, который открывает файл данных с использованием метода python open
, сканирует последнюю обработанную выборку, собирает данные из этой выборки и далее, а также некоторые исторические данные, необходимые для прогнозирования, и запускает задание на кластер искр. Кроме того, для каждого фиксированного числа выборок каждый алгоритм требует перестройки, которая запрашивает длинную историю из того же хранилища данных и запускается на искровом кластере.
Наконец, RDD, который обрабатывается заданием прогнозирования, выглядит следующим образом это:
|-----------------------------|
| sensor_id | sensor_data |
|-----------------------------|
| SENSOR_0 | [13,52,43,54,5] |
| SENSOR_1 | [22,42,23,3,35] |
| SENSOR_2 | [43,2,53,64,42] |
|-----------------------------|
Мы сейчас сталкиваемся с проблемой масштаба при мониторинге нескольких сотен тысяч датчиков. Кажется, что наиболее дорогостоящей операцией во время процесса является чтение данных из файлов - задержка в несколько десятков миллисекунд при чтении каждого файла накапливается до неуправляемой задержки для всего задания прогнозирования. Кроме того, хранение данных в виде плоских файлов на диске вообще не масштабируется.
Мы рассматриваем изменение метода хранения для повышения производительности и обеспечения масштабируемости. Использование баз данных временных рядов (мы попробовали timescaledb & effxdb) ставит проблему запроса данных для всех датчиков в одном запросе, когда каждый датчик должен запрашиваться из другого момента времени, а затем группировать отдельные выборки в sensor_data
Столбец, как показано выше, который очень дорогой, вызывает много тасовок и даже не справляется с решением плоских файлов. Мы также пробуем файлы паркетных файлов, но их поведение при однократной записи затрудняет планирование структуры данных, которая будет хорошо работать в этом случае.
tl; dr - я ищу эффективную архитектуру для следующего сценария:
- данные датчика потоковой передачи поступают в режиме реального времени
- , когда датчик накапливает достаточное количество выборок, текущие + исторические данные c запрашиваются и отправляются на задание прогнозирования
- Каждое задание прогнозирования обрабатывает все датчики, которые достигли порогового значения в последней микропакете
- СДР содержит строки идентификатора датчика и упорядоченный массив всех запрашиваемых выборок