Следующий воспроизводимый код делает то, что я хочу, но медленно.Я не уверен, правильно ли я запускаю функцию map_simScore()
, чтобы получить правильный уровень параллелизма.
Инициализация тестового DataFrame с spark.range(0, 25000, 1)
приводит к созданию DataFrame с 76 МБ, распределенного по 3 разделам.
Мой кластер имеет 3 рабочих узла с 16 ядрами и 62 ГБ памяти каждый.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import Word2Vec
spark = (
SparkSession.builder
.master('yarn')
.appName("linalg_test")
.getOrCreate()
)
placeholder = (
r"Lorem ipsum dolor sit amet consectetur adipiscing elit "
r"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua "
r"Ut enim ad minim veniam quis nostrud exercitation ullamco laboris "
r"nisi ut aliquip ex ea commodo consequat Duis aute irure dolor in "
r"reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
r"pariatur Excepteur sint occaecat cupidatat non proident sunt in "
r"culpa qui officia deserunt mollit anim id est laborum"
)
win = (
W.partitionBy(F.col('doc_id'))
.rowsBetween(W.unboundedPreceding, W.currentRow)
)
df_SO = (
spark.range(0, 25000, 1)
.withColumn('rand1', (F.rand(seed=12345) * 50).cast(T.IntegerType()))
.withColumn('doc_id', F.floor(F.col('rand1')/4) )
.withColumn('int', F.lit(1))
.withColumn('line_id', F.sum('int').over(win))
.withColumn('rand2', (F.rand(seed=54321) * 50).cast(T.IntegerType()))
.withColumn('rand3', (F.rand(seed=51432) * 100).cast(T.IntegerType()))
.withColumn('text', F.lit(placeholder))
.withColumn('text', F.expr("substring(text, rand2, rand3)" ))
.withColumn('text', F.split(F.col('text'), ' '))
.where(F.col('rand2') > 3)
.select('doc_id', 'line_id', 'text')
)
word2Vec = (
Word2Vec()
.setInputCol("text")
.setOutputCol("vector")
.setMinCount(1)
.setNumPartitions(5)
.setStepSize(0.1)
.setWindowSize(10)
.setVectorSize(400)
.setMaxSentenceLength(1)
)
model_SO = word2Vec.fit(df_SO)
df_SO2 = model_SO.transform(df_SO)
df_SO2.rdd.getNumPartitions()
df_SO2 = df_SO2.repartition(3, 'doc_id')
spark.catalog.clearCache()
df_SO2.createOrReplaceTempView("df_SO2")
spark.catalog.cacheTable("df_SO2")
df_SO2.limit(1).count()
doc_ids = (
df_SO
.groupBy('doc_id')
.agg(F.count(F.col('doc_id')).alias('numLines') )
.toPandas()
)
def map_simScore(id):
dftmp = df_SO2.filter(F.col('doc_id') == id)
dfcnt = float(doc_ids[doc_ids.doc_id.eq(id)]['numLines'].values[0])
stats = (
IndexedRowMatrix(dftmp.rdd.map(lambda row: (row.line_id, row.vector.toArray())))
.toBlockMatrix()
.transpose()
.toIndexedRowMatrix()
.columnSimilarities()
.toRowMatrix()
.computeColumnSummaryStatistics()
)
SimScore = len(stats.max()[np.where(stats.max() > 0.8)]) / dfcnt
return (id, SimScore)
doc_ids.doc_id.map(map_invId_simScore)
Также попытался выполнить следующее, что дает те же результаты, но не обязательно работаетбыстрее и также инициируется таким же образом, что может быть проблемой.
def map_simScore2(id):
dftmp = df_SO2.filter(F.col('doc_id') == id)
dfcnt = float(doc_ids[doc_ids.doc_id.eq(id)]['numLines'].values[0])
SimScore = (
spark.createDataFrame(
IndexedRowMatrix(dftmp.rdd.map(lambda row: (row.line_id, row.vector.toArray())))
.toBlockMatrix()
.transpose()
.toIndexedRowMatrix()
.columnSimilarities()
.entries)
.filter(F.col('value') > 0.80)
.groupBy('j')
.agg(F.count('j'))
.count() / dfcnt
)
return (id, SimScore)
doc_ids.doc_id.map(map_invId_simScore2)