В настоящее время я создаю функции для набора данных, который состоит из данных временного ряда для различных показаний датчиков некоторых единиц оборудования, которые, вероятно, связаны с событиями неисправности того же самого.Основная структура этих данных состоит в том, что у нас есть своего рода таблица, которая объединяет идентификатор оборудования, временные метки и показания датчиков.
| ID | Cycle_ID | Timestamp | sensor_1 | sensor_2 |
|----|----------|------------|----------|----------|
| 1 | 1 | 1547142555 | 123 | 641 |
| 1 | 1 | 1547142556 | 123 | 644 |
| 1 | 2 | 1547142557 | 124 | 643 |
В настоящее время идея состоит в агрегировании данных на основе циклов для созданияпоследовательность (и соответствующие особенности), соответствующие тем.Объем необработанных данных огромен и требует использования искры, но результирующий набор данных после агрегирования достаточно мал, чтобы использовать его для хранения в DF Pandas и построения модели с кератами.Среди прочего, одна идея состоит в том, чтобы собрать ведущие компоненты DCT для некоторых датчиков, чтобы использовать их в качестве функции.Чтобы сделать это, мы делаем (среди прочего) следующее агрегирование:
from pyspark.sql import Row, window
import pyspark.sql.functions as func
W = window.Window.partitionBy('ID', 'Cycle_ID').orderBy('Timestamp')
df_collect = pfr_flight_match.withColumn('sensor_1_coll',
func.collect_list('sensor_1').over(W)) \
.groupBy('ID', 'Cycle_ID') \
.agg(func.max("sensor_1_coll").alias('sensor_1_coll'))
Это дает мне, для каждого цикла каждого оборудования в отдельности, временной ряд датчика в виде массива.Идея состоит в том, чтобы теперь выполнить DCT, сохранить только ведущие коэффициенты n
и добавить их отдельно в качестве столбцов новых функций.Я нашел способ сделать это, однако производительность кажется ужасной, поэтому я ищу помощи.
Поскольку, к сожалению, невозможно использовать DCT Pyspark в массиве (согласно документации, функция должна быть типа DenseVector), нам нужно преобразовать собранный массив в DenseVector.Мне кажется, что не существует эффективного способа, поэтому я использую UDF для этого:
import pyspark.ml
to_vec = func.udf(lambda x: pyspark.ml.linalg.DenseVector(x),
pyspark.ml.linalg.VectorUDT())
Следующим шагом является выполнение самого DCT, используя что-то вроде этого:
# Determine which column is the target of DCT
col_to_transform = 'sensor_1_coll'
df = df_collect.withColumn('vec', to_vec(col_to_transform))
# After switching the column type to DenseVector, we can apply DCT
dct = pyspark.ml.feature.DCT(inverse=False, inputCol='vec', outputCol='vec_dct')
df_dct = dct.transform(df)
# Drop intermediate columns
df_dct = df_dct.drop('vec', col_to_transform)
Теперь наступает момент, когда я опасаюсь ловушки: нам нужно усечь вектор DCT до некоторого количества коэффициентов, которые затем нужно разбить на отдельные столбцы для последующей передачи их в массив Pandas DF / Numpy..
Боюсь, что использование UDF не очень хорошо с точки зрения производительности;и в любом случае DenseVector не представлен как тип массива.Так что это здесь не работает:
import pyspark.ml
trunc_vec = func.udf(lambda x: x[0:n],
pyspark.ml.linalg.VectorUDT())
Итак, что я в конце концов сделал, так это сопоставил подходящую функцию с версией RDD вышеупомянутого DF и вернул ее в виде фрейма данных.Это то, что я сейчас использую:
# State columns used for grouping
idx = ['ID', 'Cycle_ID']
keep_coeffs = 30 # How many of the leading coefficients shall be kept?
from functools import partial
# To be mapped onto rdd: Return auxillary columns plus the DCT coeffs as
# individual columns, which are named serially
def truncate_dct_vec(vec, coeffs):
return tuple(vec[i] for i in idx) + tuple(vec.vec_dct.toArray()[0:coeffs+1].tolist())
truncate_dct_vec = partial(truncate_dct_vec, coeffs=keep_coeffs)
# Perform the mapping to get the truncated DCT coefficients, each in an individual column
df_dct = df_dct.rdd.map(truncate_dct_vec).toDF(idx)
Проблема в том, что это кажется очень медленным (вероятно, из-за сериализации и преобразования между JVM и python, выполняющими все эти шаги),что почти запредельно.Я в основном ищу более быстрые альтернативы.Любая помощь с этим приветствуется.