Оптимизация совокупного суммирования разреженного вектора в Spark (и сохранение в паркете) - PullRequest
0 голосов
/ 28 февраля 2020

Прошу прощения за вопрос Pyspark NOOB.

Мой последний этап создания кадра данных Spark в PySpark следующий:

indexer = StringIndexer(inputCol="kpID", outputCol="KPindex")
inputs = [indexer.getOutputCol()]
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=["KPvec"])
pipeline = Pipeline(stages=[indexer, encoder])
df_bayes = pipeline.fit(df_bayes).transform(df_bayes)

def sparse_to_array(v):
    v = DenseVector(v)
    new_array = list([float(x) for x in v])
    return new_array

sparse_to_array_udf = F.udf(sparse_to_array, ArrayType(FloatType()))
df_bayes = df_bayes.select('id',sparse_to_array_udf(col('KPvec')).alias('KPvec'))
df_bayes = df_bayes.repartition(15000,col('id'))
df_bayes = df_bayes.select('id','KPvec').groupby('id').agg(F.array(*[F.sum(F.col('KPvec')[i]) for i in range(len(kids))]).alias("KPvec")).cache()

Я пытаюсь агрегировать-суммировать разреженный вектор, который представляет собой одну горячую закодированную категориальную переменную.

В моем кластере EMR это занимает 188 секунд. И итоговый кадр данных имеет ~ 50 миллионов строк. Затем я пытаюсь записать этот фрейм данных в паркет.

Я пробовал:

df_bayes.write.format("parquet") \
.partitionBy("id") \
.bucketBy(500,"KPvec") \
.option("path", "s3://..."+"output.parquet") \
.saveAsTable("output")

И:

df_bayes.write.repartition(1500,col('id')).parquet("s3://..."+"output.parquet")

И без переразметки.

В каждом случае задание занимает очень много времени и в итоге завершается неудачей с ExecutorLostFailure (что связано с выполнением EMR со многими точечными экземплярами).

Here is the Spark DAG Visualization

Несмотря на кэширование ранее, я подозреваю, что многие из этих шагов на самом деле не относятся к написанию паркета, а скорее к шагам вычисления I запросили.

Я подозреваю, что это так, потому что, если я пытаюсь вычислить размеры кадра данных, я вижу, что визуализация DAG:

ag

Повторяющиеся шаги и ~ 6 ГБ случайной записи до того, как задание не выполнено, указывают мне на большую неэффективность моих вычислений.

Кроме того, когда я запускаю explain, я получаю следующее:

== Physical Plan ==
InMemoryTableScan [id#1, KPvec#52167]
   +- InMemoryRelation [id#1, KPvec#52167], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- HashAggregate(keys=[id#1], functions=[sum(cast(KPvec#27[0] as double)), sum(cast(KPvec#27[1] as double)), sum(cast(KPvec#27[2] as double)), sum(cast(KPvec#27[3] as double)), sum(cast(KPvec#27[4] as double)), sum(cast(KPvec#27[5] as double)), sum(cast(KPvec#27[6] as double)), sum(cast(KPvec#27[7] as double)), sum(cast(KPvec#27[8] as double)), sum(cast(KPvec#27[9] as double)), sum(cast(KPvec#27[10] as double)), sum(cast(KPvec#27[11] as double)), sum(cast(KPvec#27[12] as double)), sum(cast(KPvec#27[13] as double)), sum(cast(KPvec#27[14] as double)), sum(cast(KPvec#27[15] as double)), sum(cast(KPvec#27[16] as double)), sum(cast(KPvec#27[17] as double)), sum(cast(KPvec#27[18] as double)), sum(cast(KPvec#27[19] as double)), sum(cast(KPvec#27[20] as double)), sum(cast(KPvec#27[21] as double)), sum(cast(KPvec#27[22] as double)), sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
            +- HashAggregate(keys=[id#1], functions=[partial_sum(cast(KPvec#27[0] as double)), partial_sum(cast(KPvec#27[1] as double)), partial_sum(cast(KPvec#27[2] as double)), partial_sum(cast(KPvec#27[3] as double)), partial_sum(cast(KPvec#27[4] as double)), partial_sum(cast(KPvec#27[5] as double)), partial_sum(cast(KPvec#27[6] as double)), partial_sum(cast(KPvec#27[7] as double)), partial_sum(cast(KPvec#27[8] as double)), partial_sum(cast(KPvec#27[9] as double)), partial_sum(cast(KPvec#27[10] as double)), partial_sum(cast(KPvec#27[11] as double)), partial_sum(cast(KPvec#27[12] as double)), partial_sum(cast(KPvec#27[13] as double)), partial_sum(cast(KPvec#27[14] as double)), partial_sum(cast(KPvec#27[15] as double)), partial_sum(cast(KPvec#27[16] as double)), partial_sum(cast(KPvec#27[17] as double)), partial_sum(cast(KPvec#27[18] as double)), partial_sum(cast(KPvec#27[19] as double)), partial_sum(cast(KPvec#27[20] as double)), partial_sum(cast(KPvec#27[21] as double)), partial_sum(cast(KPvec#27[22] as double)), partial_sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
               +- Exchange hashpartitioning(id#1, 15000)
                  +- *(2) Project [id#1, pythonUDF0#52170 AS KPvec#27]
                     +- BatchEvalPython [sparse_to_array(KPvec#3)], [KPvec#3, id#1, pythonUDF0#52170]
                        +- *(1) Project [KPvec#3, id#1]
                           +- *(1) FileScan parquet [id#1,KPvec#3] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://...bayesnetw..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,KPvec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>

Может кто-нибудь указать мне, что я делаю здесь неправильно?

Заранее спасибо.

1 Ответ

0 голосов
/ 06 марта 2020

Итак, чтобы ответить на мой собственный вопрос, если он кому-то поможет, решение состоит в том, чтобы собрать_установить функции, которые вы хотите кодировать одним щелчком, а затем использовать CountVectorizor вместо OneHotEncoder Spark ML.

df.select('id','feature').groupby('id').agg(F.collect_set('feature').alias('feature'))

countModel = CountVectorizer().setInputCol("feature").setOutputCol("feature_vec").fit(df)
df = countModel.transform(df).select('id','KPvec')

Тогда вы можете просто сохранить его на паркете. Для меня это довольно быстро.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...