Функции секвенирования PySpark - PullRequest
0 голосов
/ 18 сентября 2018

У меня проблемы с пониманием типов возвращаемых искровых файлов. У меня есть два pyspark dataframes.

DF1 содержит список идентификаторов в каждой строке, например,

+-------------+
|seq          |
+-------------+
|[1, 2, 3, 4]|
|[5, 6, 7]   |

DF2 имеет закодированные функции, собранные в SparseVector, назначенные каждому уникальному идентификатору, например,

+--------+--------------------+
|ID|            features|
+--------+--------------------+
|    1|(67,[2,36,42,46,5...|
|    2|(67,[4,36,42,46,5...|

Теперь моя цель - передать данные в LSTM в форме (N-samples, sequence_length, n_features). В настоящее время я преобразовываю разреженный вектор в плотный, а затем использую udf, но это стало невозможным из-за большего количества функций. Затем я попытался использовать разреженный формат для экономии памяти:

import pyspark.sql.functions as fun
from pyspark.mllib.linalg import VectorUDT
features = spark.sparkContext.broadcast(
    df2.rdd.collectAsMap())

def add_features(s):
    res = []
    for each in s:
        res += [features.value.get(each)]
    return res


features_udf = fun.udf(add_features, returnType=ArrayType(VectorUDT()))

df1.withColumn('seq_features', features_udf('seq')) \
    .write.mode('overwrite').parquet(path+ 'data_df')

Однако я получаю ошибку:

Py4JJavaError: An error occurred while calling o93.parquet.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)

TLDR : мне нужны данные в трехмерной форме. Строки представляют количество последовательностей, столбцы представляют количество элементов в каждой последовательности (переменная длина), а третье затемнение представляет закодированные объекты в разреженном или плотном формате. У меня есть один фрейм данных, указывающий последовательность идентификаторов в каждой строке, и один ID фрейма данных, отображающий объекты. Мне нужно оптимально объединить эти два кадра данных, так как данные очень большие.

Результат, который я ищу:

+---------------------------------------------+
|seq                                          |
+---------------------------------------------+
|[[features1],[features2],[features3], [features4]]|
|[[features5], [features6], [features7]]   |

Спасибо за любые предложения и помощь.

EDIT : ошибка Py4JJavaError заключается в том, что вместо этого я должен использовать VectorUDT из pyspark.ml.linalg ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...