Самый эффективный способ создания функций из ведущих коэффициентов DCT агрегированных данных - PySpark - PullRequest
0 голосов
/ 06 мая 2019

В настоящее время я создаю функции для набора данных, который состоит из данных временного ряда для различных показаний датчиков некоторых единиц оборудования, которые, вероятно, связаны с событиями неисправности того же самого.Основная структура этих данных состоит в том, что у нас есть своего рода таблица, которая объединяет идентификатор оборудования, временные метки и показания датчиков.

| ID | Cycle_ID | Timestamp  | sensor_1 | sensor_2 |
|----|----------|------------|----------|----------|
| 1  | 1        | 1547142555 | 123      | 641      |
| 1  | 1        | 1547142556 | 123      | 644      |
| 1  | 2        | 1547142557 | 124      | 643      |

В настоящее время идея состоит в агрегировании данных на основе циклов для созданияпоследовательность (и соответствующие особенности), соответствующие тем.Объем необработанных данных огромен и требует использования искры, но результирующий набор данных после агрегирования достаточно мал, чтобы использовать его для хранения в DF Pandas и построения модели с кератами.Среди прочего, одна идея состоит в том, чтобы собрать ведущие компоненты DCT для некоторых датчиков, чтобы использовать их в качестве функции.Чтобы сделать это, мы делаем (среди прочего) следующее агрегирование:


from pyspark.sql import Row, window
import pyspark.sql.functions as func

W = window.Window.partitionBy('ID', 'Cycle_ID').orderBy('Timestamp')

df_collect = pfr_flight_match.withColumn('sensor_1_coll', 
                 func.collect_list('sensor_1').over(W)) \
                 .groupBy('ID', 'Cycle_ID') \ 
                 .agg(func.max("sensor_1_coll").alias('sensor_1_coll'))

Это дает мне, для каждого цикла каждого оборудования в отдельности, временной ряд датчика в виде массива.Идея состоит в том, чтобы теперь выполнить DCT, сохранить только ведущие коэффициенты n и добавить их отдельно в качестве столбцов новых функций.Я нашел способ сделать это, однако производительность кажется ужасной, поэтому я ищу помощи.

Поскольку, к сожалению, невозможно использовать DCT Pyspark в массиве (согласно документации, функция должна быть типа DenseVector), нам нужно преобразовать собранный массив в DenseVector.Мне кажется, что не существует эффективного способа, поэтому я использую UDF для этого:

import pyspark.ml
to_vec = func.udf(lambda x: pyspark.ml.linalg.DenseVector(x),
                  pyspark.ml.linalg.VectorUDT())

Следующим шагом является выполнение самого DCT, используя что-то вроде этого:

# Determine which column is the target of DCT
col_to_transform = 'sensor_1_coll'
df = df_collect.withColumn('vec', to_vec(col_to_transform))

# After switching the column type to DenseVector, we can apply DCT
dct = pyspark.ml.feature.DCT(inverse=False, inputCol='vec', outputCol='vec_dct')
df_dct = dct.transform(df)

# Drop intermediate columns
df_dct = df_dct.drop('vec', col_to_transform)

Теперь наступает момент, когда я опасаюсь ловушки: нам нужно усечь вектор DCT до некоторого количества коэффициентов, которые затем нужно разбить на отдельные столбцы для последующей передачи их в массив Pandas DF / Numpy..

Боюсь, что использование UDF не очень хорошо с точки зрения производительности;и в любом случае DenseVector не представлен как тип массива.Так что это здесь не работает:

import pyspark.ml
trunc_vec = func.udf(lambda x: x[0:n],
                  pyspark.ml.linalg.VectorUDT())

Итак, что я в конце концов сделал, так это сопоставил подходящую функцию с версией RDD вышеупомянутого DF и вернул ее в виде фрейма данных.Это то, что я сейчас использую:

# State columns used for grouping
idx = ['ID', 'Cycle_ID']
keep_coeffs = 30 # How many of the leading coefficients shall be kept?

from functools import partial

# To be mapped onto rdd: Return auxillary columns plus the DCT coeffs as 
# individual columns, which are named serially
 def truncate_dct_vec(vec, coeffs):
    return tuple(vec[i] for i in idx) + tuple(vec.vec_dct.toArray()[0:coeffs+1].tolist())
truncate_dct_vec = partial(truncate_dct_vec, coeffs=keep_coeffs)

# Perform the mapping to get the truncated DCT coefficients, each in an individual column
df_dct = df_dct.rdd.map(truncate_dct_vec).toDF(idx)

Проблема в том, что это кажется очень медленным (вероятно, из-за сериализации и преобразования между JVM и python, выполняющими все эти шаги),что почти запредельно.Я в основном ищу более быстрые альтернативы.Любая помощь с этим приветствуется.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...