Получить функцию для параллельного запуска с pyspark.mllib.linalg.distributed matrix - PullRequest
0 голосов
/ 04 июня 2019

Следующий воспроизводимый код делает то, что я хочу, но медленно.Я не уверен, правильно ли я запускаю функцию map_simScore(), чтобы получить правильный уровень параллелизма.

Инициализация тестового DataFrame с spark.range(0, 25000, 1) приводит к созданию DataFrame с 76 МБ, распределенного по 3 разделам.

Мой кластер имеет 3 рабочих узла с 16 ядрами и 62 ГБ памяти каждый.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import Word2Vec    

spark = (
    SparkSession.builder
    .master('yarn')
    .appName("linalg_test")
    .getOrCreate()
)    

placeholder = (
    r"Lorem ipsum dolor sit amet consectetur adipiscing elit "
    r"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua "
    r"Ut enim ad minim veniam quis nostrud exercitation ullamco laboris "
    r"nisi ut aliquip ex ea commodo consequat Duis aute irure dolor in "
    r"reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
    r"pariatur Excepteur sint occaecat cupidatat non proident sunt in "
    r"culpa qui officia deserunt mollit anim id est laborum"
)

win = (
    W.partitionBy(F.col('doc_id'))
    .rowsBetween(W.unboundedPreceding, W.currentRow)
)

df_SO = (
    spark.range(0, 25000, 1)
    .withColumn('rand1', (F.rand(seed=12345) * 50).cast(T.IntegerType()))
    .withColumn('doc_id', F.floor(F.col('rand1')/4) )
    .withColumn('int', F.lit(1))
    .withColumn('line_id', F.sum('int').over(win))
    .withColumn('rand2', (F.rand(seed=54321) * 50).cast(T.IntegerType()))
    .withColumn('rand3', (F.rand(seed=51432) * 100).cast(T.IntegerType()))
    .withColumn('text', F.lit(placeholder))
    .withColumn('text', F.expr("substring(text, rand2, rand3)" ))
    .withColumn('text', F.split(F.col('text'), ' '))
    .where(F.col('rand2') > 3)
    .select('doc_id', 'line_id', 'text')
)       

word2Vec = (
    Word2Vec()
    .setInputCol("text")
    .setOutputCol("vector")
    .setMinCount(1)
    .setNumPartitions(5)
    .setStepSize(0.1)
    .setWindowSize(10)
    .setVectorSize(400)
    .setMaxSentenceLength(1)
)

model_SO = word2Vec.fit(df_SO)

df_SO2 = model_SO.transform(df_SO)


df_SO2.rdd.getNumPartitions()

df_SO2 = df_SO2.repartition(3, 'doc_id')

spark.catalog.clearCache()
df_SO2.createOrReplaceTempView("df_SO2")
spark.catalog.cacheTable("df_SO2")
df_SO2.limit(1).count()


doc_ids = (
    df_SO
    .groupBy('doc_id')
    .agg(F.count(F.col('doc_id')).alias('numLines') )
    .toPandas()
)


def map_simScore(id):
    dftmp = df_SO2.filter(F.col('doc_id') == id)
    dfcnt = float(doc_ids[doc_ids.doc_id.eq(id)]['numLines'].values[0])

    stats = (
        IndexedRowMatrix(dftmp.rdd.map(lambda row: (row.line_id, row.vector.toArray())))
        .toBlockMatrix()
        .transpose()
        .toIndexedRowMatrix()
        .columnSimilarities()
        .toRowMatrix()
        .computeColumnSummaryStatistics()
    )

    SimScore = len(stats.max()[np.where(stats.max() > 0.8)]) / dfcnt

    return (id, SimScore)


doc_ids.doc_id.map(map_invId_simScore)

Также попытался выполнить следующее, что дает те же результаты, но не обязательно работаетбыстрее и также инициируется таким же образом, что может быть проблемой.

def map_simScore2(id):
    dftmp = df_SO2.filter(F.col('doc_id') == id)
    dfcnt = float(doc_ids[doc_ids.doc_id.eq(id)]['numLines'].values[0])

    SimScore = (
        spark.createDataFrame(
            IndexedRowMatrix(dftmp.rdd.map(lambda row: (row.line_id, row.vector.toArray())))
            .toBlockMatrix()
            .transpose()
            .toIndexedRowMatrix()
            .columnSimilarities()
            .entries)
        .filter(F.col('value') > 0.80)
        .groupBy('j')
        .agg(F.count('j'))
        .count() / dfcnt
    )

    return (id, SimScore)


doc_ids.doc_id.map(map_invId_simScore2)

1 Ответ

0 голосов
/ 09 июня 2019

Следуя среднему посту Бена Вебера " 3 Методы распараллеливания в Spark , хорошо работает следующее.

убедитесь, что 'spark.scheduler.mode','FAIR' установлено в вашей SparkSession.

Например:

spark = (
    SparkSession.builder
    .master('yarn')
    .appName("linalg_test")
    .config('spark.scheduler.mode','FAIR')
    .getOrCreate()
)

Добавьте следующий импорт.

from multiprocessing.pool import ThreadPool

Установите количество одновременных потоков, чтобы разрешить (в данном случае 5).

pool = ThreadPool(5)

Чтобы запустить код в исходном посте, я сделал следующее.

idList = doc_ids.doc_id.to_list()

pool.map(lambda id: map_simScore(id), idList)

Я также нашел сообщение Medium от rbahaguejr " Threaded Tasks в заданиях PySpark ", в котором показано, как сделать то же самое с помощью пакета threading, но я не пробовал.

...