Хорошо, так что после нескольких дней работы выглядит простой ответ: его нет.На самом деле, похоже, что пытаться использовать Nd4j
в этом контексте вообще - плохая идея по нескольким причинам:
- (действительно) трудно получить данные изродной
INDArray
формат, как только вы его вставите. - Даже используя что-то вроде guava , метод .data () переносит все в кучу , что приведет кбыстро становятся дорогими.
- Вы столкнулись с необходимостью скомпилировать jar-файл сборки или использовать hdfs и т. д. для работы с самой библиотекой.
Я также рассмотрел вопрос об использовании Breeze , который может фактически обеспечить жизнеспособное решение, но несет некоторые из тех же проблем, и не может использоваться в распределенных структурах данных.
К сожалению, использование нативных типов данных Spark / Scala, хотя и проще, если вы знаете, как, - для кого-то вроде меня, пришедшего с Python + хотя бы с numpy + панда, - мучительно запутанно и безобразно.
Тем не менее, я успешно реализовал это решение:
import org.apache.spark.mllib.linalg.{Vectors,Vector,Matrix,DenseMatrix,DenseVector}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
//first make a pseudo-matrix from Scala Array[Double]:
var rowSeq = Seq.fill(windows.length)(Array.fill(words.length)(0d))
//iterate through 'rows' and 'columns' to fill it:
for (row 0 until windows.length){
for (column 0 until words.length){
// rowSeq(row)(column) += 1 if word occurs at phrase, phrasePosition indicated by window_n.
}
}
//create Spark DenseMatrix
val rows : Array[Double] = rowSeq.transpose.flatten.toArray
val matrix = new DenseMatrix(windows.length,words.length,rows)
Одна из основных операций, для которой мне понадобился Nd4J, была matrix.T.dot(matrix)
, но оказалось, что вы не можете умножить 2 матрицы типа org.apache.spark.mllib.linalg.DenseMatrix
вместеодин из них (A) должен быть org.apache.spark.mllib.linalg.distributed.RowMatrix
и - как вы уже догадались - вы не можете позвонить matrix.transpose()
на RowMatrix
, только на DenseMatrix
!Поскольку это не имеет никакого отношения к вопросу, я оставлю эту часть, за исключением объяснения, что результатом этого шага является RowMatrix
.Также следует отдать здесь и здесь за последнюю часть решения:
val rowMatrix : [RowMatrix] = transposeAndDotDenseMatrix(matrix)
// get DataFrame from RowMatrix via DenseMatrix
val newdense = new DenseMatrix(rowMatrix.numRows().toInt,rowMatrix.numCols().toInt,rowMatrix.rows.collect.flatMap(x => x.toArray)) // the call to collect() here is undesirable...
val matrixRows = newdense.rowIter.toSeq.map(_.toArray)
val df = spark.sparkContext.parallelize(matrixRows).toDF("Rows")
// then separate columns:
val df2 = (0 until words.length).foldLeft(df)((df, num) =>
df.withColumn(words(num), $"Rows".getItem(num)))
.drop("Rows")
Хотелось бы услышать улучшения и предложения по этому вопросу, спасибо.