Как отобразить функцию RDD на каждый RDD в итераторе, возвращаемом mapPartitions - PullRequest
1 голос
/ 08 июня 2019

У меня есть DataFrame с идентификаторами документов doc_id, идентификаторами строк для набора строк в каждом документе line_id и плотным векторным представлением каждой строки vectors.Для каждого документа (doc_id) я хочу преобразовать набор векторов, представляющих каждую строку, в mllib.linalg.distributed.BlockMatrix

Довольно просто преобразовать векторы всего DataFrame или DataFrame, отфильтрованных по doc_id в BlockMatrix, сначала преобразовав векторы в СДР (numRows, numCols), DenseMatrix).Закодированный пример этого ниже.

Однако у меня возникли проблемы с преобразованием СДР Iterator[(numRows, numCols), DenseMatrix)], возвращаемого mapPartition, который преобразовал столбец векторов для каждого раздела doc_id, в отдельный BlockMatrix для каждого раздела doc_id.

Мой кластер имеет 3 рабочих узла с 16 ядрами и 62 ГБ памяти каждый.


Импорт и запуск запуска

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.mllib.random import RandomRDDs
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg import VectorUDT
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg import MatrixUDT
from pyspark.mllib.linalg.distributed import BlockMatrix

spark = (
    SparkSession.builder
    .master('yarn')
    .appName("linalg_test")
    .getOrCreate()
) 

Создание тестового фрейма данных

nRows = 25000

""" Create ids dataframe """
win = (W
    .partitionBy(F.col('doc_id'))    
    .rowsBetween(W.unboundedPreceding, W.currentRow)
)

df_ids = (
    spark.range(0, nRows, 1)
    .withColumn('rand1', (F.rand(seed=12345) * 50).cast(T.IntegerType()))
    .withColumn('doc_id', F.floor(F.col('rand1')/3).cast(T.IntegerType()) )
    .withColumn('int', F.lit(1))
    .withColumn('line_id', F.sum(F.col('int')).over(win))
    .select('id', 'doc_id', 'line_id')
)

""" Create vector dataframe """
df_vecSchema = T.StructType([
    T.StructField('vectors', T.StructType([T.StructField('vectors', VectorUDT())] ) ), 
    T.StructField('id', T.LongType()) 
])

vecDim = 50
df_vec = (
    spark.createDataFrame(
        RandomRDDs.normalVectorRDD(sc, numRows=nRows, numCols=vecDim, seed=54321)
        .map(lambda x: Row(vectors=Vectors.dense(x),))
        .zipWithIndex(), schema=df_vecSchema)
    .select('id', 'vectors.*')
)

""" Create final test dataframe """
df_SO = (
    df_ids.join(df_vec, on='id', how='left')
    .select('doc_id', 'line_id', 'vectors')
    .orderBy('doc_id', 'line_id')
)

numDocs = df_SO.agg(F.countDistinct(F.col('doc_id'))).collect()[0][0]
# numDocs = df_SO.groupBy('doc_id').agg(F.count(F.col('line_id'))).count()

df_SO = df_SO.repartition(numDocs, 'doc_id')

Функции RDD для создания матриц из столбца Vector

def vec2mat(row):
    return ( 
        (row.line_id-1, 0), 
        Matrices.dense(1, vecDim, (row.vectors.toArray().tolist())), )

создание плотной матрицы изкаждый line_id вектор

mat = df_SO.rdd.map(vec2mat)

создать распределенный BlockMatrix из RDD DenseMatrix

blk_mat = BlockMatrix(mat, 1, vecDim)

проверить вывод

blk_mat
<pyspark.mllib.linalg.distributed.BlockMatrix at 0x7fe1da370a50>
blk_mat.blocks.take(1)
[((273, 0),
  DenseMatrix(1, 50, [1.749, -1.4873, -0.3473, 0.716, 2.3916, -1.5997, -1.7035, 0.0105, ..., -0.0579, 0.3074, -1.8178, -0.2628, 0.1979, 0.6046, 0.4566, 0.4063], 0))]

Проблема

Я не могу заставить работать одну и ту же вещь после преобразования каждого раздела doc_id в mapPartitions.Функция mapPartitions работает, но я не могу получить преобразованный RDD в функцию BlockMatrix.

RDD для создания плотной матрицы из каждого вектора line_id отдельно для каждого doc_id раздел

def vec2mat_p(iter):
    yield [((row.line_id-1, 0), 
            Matrices.dense(1, vecDim, (row.vectors.toArray().tolist())), )
        for row in iter]

создание плотной матрицы из каждого line_id вектора отдельно для каждого doc_id раздела

mat_doc = df_SO.rdd.mapPartitions(vec2mat_p, preservesPartitioning=True)

Проверка

mat_doc 
PythonRDD[4991] at RDD at PythonRDD.scala:48
mat_test.take(1)
[[((0, 0),
   DenseMatrix(1, 50, [1.814, -1.1681, -2.1887, -0.5371, -0.7509, 2.3679, 0.2795, 1.4135, ..., -0.3584, 0.5059, -0.6429, -0.6391, 0.0173, 1.2109, 1.804, -0.9402], 0)),
  ((1, 0),
   DenseMatrix(1, 50, [0.3884, -1.451, -0.0431, -0.4653, -2.4541, 0.2396, 1.8704, 0.8471, ..., -2.5164, 0.1298, -1.2702, -0.1286, 0.9196, -0.7354, -0.1816, -0.4553], 0)),
  ((2, 0),
   DenseMatrix(1, 50, [0.1382, 1.6753, 0.9563, -1.5251, 0.1753, 0.9822, 0.5952, -1.3924, ..., 0.9636, -1.7299, 0.2138, -2.5694, 0.1701, 0.2554, -1.4879, -1.6504], 0)),
  ...]]

Типы проверок

(mat_doc 
    .filter(lambda p: len(p) > 0)
    .map(lambda mlst: [(type(m[0]), (type(m[0][0]),type(m[0][1])), type(m[1])) for m in mlst] )
    .first()
)
[(tuple, (int, int), pyspark.mllib.linalg.DenseMatrix),
 (tuple, (int, int), pyspark.mllib.linalg.DenseMatrix),
 (tuple, (int, int), pyspark.mllib.linalg.DenseMatrix),
 ...]

Кажется правильным, однако, работает:

(mat_doc 
    .filter(lambda p: len(p) > 0)
    .map(lambda mlst: [BlockMatrix((m[0], m[1])[0], 1, vecDim) for m in mlst] )
    .first()
)

приводит к следующей ошибке типа:

TypeError: blocks should be an RDD of sub-matrix blocks as ((int, int), matrix) tuples, got 

К сожалению, ошибка останавливается и не сообщает, что она получила'.

Кроме того, я не могу вызвать sc.parallelize() внутри вызова map().

Как преобразовать каждый элемент в итераторе RDD, который mapPartitions возвращает в RDD, который BlockMatrix примет?

...