У меня есть DataFrame с идентификаторами документов doc_id
, идентификаторами строк для набора строк в каждом документе line_id
и плотным векторным представлением каждой строки vectors
.Для каждого документа (doc_id
) я хочу преобразовать набор векторов, представляющих каждую строку, в mllib.linalg.distributed.BlockMatrix
Довольно просто преобразовать векторы всего DataFrame или DataFrame, отфильтрованных по doc_id
в BlockMatrix
, сначала преобразовав векторы в СДР (numRows, numCols), DenseMatrix)
.Закодированный пример этого ниже.
Однако у меня возникли проблемы с преобразованием СДР Iterator[(numRows, numCols), DenseMatrix)]
, возвращаемого mapPartition
, который преобразовал столбец векторов для каждого раздела doc_id
, в отдельный BlockMatrix
для каждого раздела doc_id
.
Мой кластер имеет 3 рабочих узла с 16 ядрами и 62 ГБ памяти каждый.
Импорт и запуск запуска
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.mllib.random import RandomRDDs
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg import VectorUDT
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg import MatrixUDT
from pyspark.mllib.linalg.distributed import BlockMatrix
spark = (
SparkSession.builder
.master('yarn')
.appName("linalg_test")
.getOrCreate()
)
Создание тестового фрейма данных
nRows = 25000
""" Create ids dataframe """
win = (W
.partitionBy(F.col('doc_id'))
.rowsBetween(W.unboundedPreceding, W.currentRow)
)
df_ids = (
spark.range(0, nRows, 1)
.withColumn('rand1', (F.rand(seed=12345) * 50).cast(T.IntegerType()))
.withColumn('doc_id', F.floor(F.col('rand1')/3).cast(T.IntegerType()) )
.withColumn('int', F.lit(1))
.withColumn('line_id', F.sum(F.col('int')).over(win))
.select('id', 'doc_id', 'line_id')
)
""" Create vector dataframe """
df_vecSchema = T.StructType([
T.StructField('vectors', T.StructType([T.StructField('vectors', VectorUDT())] ) ),
T.StructField('id', T.LongType())
])
vecDim = 50
df_vec = (
spark.createDataFrame(
RandomRDDs.normalVectorRDD(sc, numRows=nRows, numCols=vecDim, seed=54321)
.map(lambda x: Row(vectors=Vectors.dense(x),))
.zipWithIndex(), schema=df_vecSchema)
.select('id', 'vectors.*')
)
""" Create final test dataframe """
df_SO = (
df_ids.join(df_vec, on='id', how='left')
.select('doc_id', 'line_id', 'vectors')
.orderBy('doc_id', 'line_id')
)
numDocs = df_SO.agg(F.countDistinct(F.col('doc_id'))).collect()[0][0]
# numDocs = df_SO.groupBy('doc_id').agg(F.count(F.col('line_id'))).count()
df_SO = df_SO.repartition(numDocs, 'doc_id')
Функции RDD для создания матриц из столбца Vector
def vec2mat(row):
return (
(row.line_id-1, 0),
Matrices.dense(1, vecDim, (row.vectors.toArray().tolist())), )
создание плотной матрицы изкаждый line_id
вектор
mat = df_SO.rdd.map(vec2mat)
создать распределенный BlockMatrix из RDD DenseMatrix
blk_mat = BlockMatrix(mat, 1, vecDim)
проверить вывод
blk_mat
<pyspark.mllib.linalg.distributed.BlockMatrix at 0x7fe1da370a50>
blk_mat.blocks.take(1)
[((273, 0),
DenseMatrix(1, 50, [1.749, -1.4873, -0.3473, 0.716, 2.3916, -1.5997, -1.7035, 0.0105, ..., -0.0579, 0.3074, -1.8178, -0.2628, 0.1979, 0.6046, 0.4566, 0.4063], 0))]
Проблема
Я не могу заставить работать одну и ту же вещь после преобразования каждого раздела doc_id
в mapPartitions
.Функция mapPartitions
работает, но я не могу получить преобразованный RDD в функцию BlockMatrix
.
RDD для создания плотной матрицы из каждого вектора line_id
отдельно для каждого doc_id
раздел
def vec2mat_p(iter):
yield [((row.line_id-1, 0),
Matrices.dense(1, vecDim, (row.vectors.toArray().tolist())), )
for row in iter]
создание плотной матрицы из каждого line_id
вектора отдельно для каждого doc_id
раздела
mat_doc = df_SO.rdd.mapPartitions(vec2mat_p, preservesPartitioning=True)
Проверка
mat_doc
PythonRDD[4991] at RDD at PythonRDD.scala:48
mat_test.take(1)
[[((0, 0),
DenseMatrix(1, 50, [1.814, -1.1681, -2.1887, -0.5371, -0.7509, 2.3679, 0.2795, 1.4135, ..., -0.3584, 0.5059, -0.6429, -0.6391, 0.0173, 1.2109, 1.804, -0.9402], 0)),
((1, 0),
DenseMatrix(1, 50, [0.3884, -1.451, -0.0431, -0.4653, -2.4541, 0.2396, 1.8704, 0.8471, ..., -2.5164, 0.1298, -1.2702, -0.1286, 0.9196, -0.7354, -0.1816, -0.4553], 0)),
((2, 0),
DenseMatrix(1, 50, [0.1382, 1.6753, 0.9563, -1.5251, 0.1753, 0.9822, 0.5952, -1.3924, ..., 0.9636, -1.7299, 0.2138, -2.5694, 0.1701, 0.2554, -1.4879, -1.6504], 0)),
...]]
Типы проверок
(mat_doc
.filter(lambda p: len(p) > 0)
.map(lambda mlst: [(type(m[0]), (type(m[0][0]),type(m[0][1])), type(m[1])) for m in mlst] )
.first()
)
[(tuple, (int, int), pyspark.mllib.linalg.DenseMatrix),
(tuple, (int, int), pyspark.mllib.linalg.DenseMatrix),
(tuple, (int, int), pyspark.mllib.linalg.DenseMatrix),
...]
Кажется правильным, однако, работает:
(mat_doc
.filter(lambda p: len(p) > 0)
.map(lambda mlst: [BlockMatrix((m[0], m[1])[0], 1, vecDim) for m in mlst] )
.first()
)
приводит к следующей ошибке типа:
TypeError: blocks should be an RDD of sub-matrix blocks as ((int, int), matrix) tuples, got
К сожалению, ошибка останавливается и не сообщает, что она получила'.
Кроме того, я не могу вызвать sc.parallelize()
внутри вызова map()
.
Как преобразовать каждый элемент в итераторе RDD, который mapPartitions
возвращает в RDD, который BlockMatrix
примет?