Pyspark выбирает несколько упорядоченных потоков данных в один RDD - PullRequest
1 голос
/ 11 февраля 2020

Я перерабатываю конвейер прогнозирования в реальном времени для потоковой передачи данных датчика IoT. Конвейер принимает образцы данных датчика, структурированные как (sensor_id, timestamp, sample_index, value), поскольку они создаются в исходной системе, сохраняют их локально и запускают пакетные задания pyspark для алгоритмов обучения и прогнозирования.

В настоящее время данные датчика сохраняются в локальные файлы на диске с одним файлом на датчик и в HDFS для потоковой передачи искры. Задание потоковой передачи собирает каждую микропакет, вычисляет, сколько образцов поступило для каждого датчика, и решает, в каких датчиках накоплено достаточно новых данных, чтобы сделать новый прогноз. Затем он сопоставляет каждую строку датчика в СДР с методом, который открывает файл данных с использованием метода python open, сканирует последнюю обработанную выборку, собирает данные из этой выборки и далее, а также некоторые исторические данные, необходимые для прогнозирования, и запускает задание на кластер искр. Кроме того, для каждого фиксированного числа выборок каждый алгоритм требует перестройки, которая запрашивает длинную историю из того же хранилища данных и запускается на искровом кластере.

Наконец, RDD, который обрабатывается заданием прогнозирования, выглядит следующим образом это:

|-----------------------------|
| sensor_id | sensor_data     |
|-----------------------------|
| SENSOR_0  | [13,52,43,54,5] |
| SENSOR_1  | [22,42,23,3,35] |
| SENSOR_2  | [43,2,53,64,42] |
|-----------------------------|

Мы сейчас сталкиваемся с проблемой масштаба при мониторинге нескольких сотен тысяч датчиков. Кажется, что наиболее дорогостоящей операцией во время процесса является чтение данных из файлов - задержка в несколько десятков миллисекунд при чтении каждого файла накапливается до неуправляемой задержки для всего задания прогнозирования. Кроме того, хранение данных в виде плоских файлов на диске вообще не масштабируется.

Мы рассматриваем изменение метода хранения для повышения производительности и обеспечения масштабируемости. Использование баз данных временных рядов (мы попробовали timescaledb & effxdb) ставит проблему запроса данных для всех датчиков в одном запросе, когда каждый датчик должен запрашиваться из другого момента времени, а затем группировать отдельные выборки в sensor_data Столбец, как показано выше, который очень дорогой, вызывает много тасовок и даже не справляется с решением плоских файлов. Мы также пробуем файлы паркетных файлов, но их поведение при однократной записи затрудняет планирование структуры данных, которая будет хорошо работать в этом случае.

tl; dr - я ищу эффективную архитектуру для следующего сценария:

  1. данные датчика потоковой передачи поступают в режиме реального времени
  2. , когда датчик накапливает достаточное количество выборок, текущие + исторические данные c запрашиваются и отправляются на задание прогнозирования
  3. Каждое задание прогнозирования обрабатывает все датчики, которые достигли порогового значения в последней микропакете
  4. СДР содержит строки идентификатора датчика и упорядоченный массив всех запрашиваемых выборок
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...