Переформатирование массива данных, содержащего массив, в RowMatrix - PullRequest
0 голосов
/ 09 мая 2019

У меня есть этот фрейм данных в следующем формате:

+----+-----+
| features |
+----+-----+
|[1,4,7,10]|
|[2,5,8,11]|
|[3,6,9,12]|
+----+----+

Скрипт для создания образца фрейма данных:

rows2 = sc.parallelize([ IndexedRow(0, [1, 4, 7, 10 ]),
                         IndexedRow(1, [2, 5, 8, 1]),
                         IndexedRow(1, [3, 6, 9, 12]),
                                   ])
rows_df = rows2.toDF()
row_vec= rows_df.drop("index")
row_vec.show()

Столбец объектов содержит 4 объекта и 3 идентификатора строки.Я хочу преобразовать эти данные в матрицу строк, где столбцы и строки будут иметь следующий формат mat:

from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12)])

# Convert to RowMatrix
mat = RowMatrix(rows)

# Calculate exact and approximate similarities
exact = mat.columnSimilarities()
approx = mat.columnSimilarities(0.05) 

По сути, я хочу переместить кадр данных в новый формат, чтобы я мог запуститьФункция columnSimilities ().У меня есть гораздо больший массив данных, который содержит 50 объектов и 39000 строк.

Ответы [ 2 ]

0 голосов
/ 10 мая 2019

Я понял, я использовал следующее:

from pyspark.mllib.linalg.distributed import RowMatrix


features_rdd = row_vec.select("features").rdd.map(lambda row: row[0])

features_mat = RowMatrix(features_rdd )

from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

coordmatrix_features  = CoordinateMatrix(
        features_mat .rows.zipWithIndex().flatMap(
        lambda x: [MatrixEntry(x[1], j, v) for j, v in enumerate(x[0])]
        )
         )
transposed_rowmatrix_features = coordmatrix_features.transpose().toRowMatrix()
0 голосов
/ 09 мая 2019

Это то, что вы пытаетесь сделать? Ненавижу использовать collect(), но не думаю, что этого можно избежать здесь, так как вы хотите преобразовать / преобразовать структурированный объект в матрицу ... верно?

X = np.array(row_vec.select("_2").collect()).reshape(-1,3)
X = sc.parallelize(X)
for i in X.collect(): print(i)
[1 4 7]
[10  2  5]
[8 1 3]
[ 6  9 12]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...