Какой самый простой способ получить Spark DataFrame из произвольных массивов данных в Scala? - PullRequest
0 голосов
/ 17 апреля 2019

Я ломал голову об этом уже пару дней. Такое чувство, что это должно быть интуитивно легко ... Действительно надеюсь, что кто-то может помочь!

Я построил org.nd4j.linalg.api.ndarray.INDArray вхождения слов из некоторых полуструктурированных данных, таких как:

import org.nd4j.linalg.factory.Nd4j
import org.nd4s.Implicits._

val docMap = collection.mutable.Map[Int,Map[Int,Int]] //of the form Map(phrase -> Map(phrasePosition -> word)
val words = ArrayBuffer("word_1","word_2","word_3",..."word_n")
val windows = ArrayBuffer("$phrase,$phrasePosition_1","$phrase,$phrasePosition_2",..."$phrase,$phrasePosition_n") 

var matrix = Nd4j.create(windows.length*words.length).reshape(windows.length,words.length)
for (row <- matrix.shape(0)){
    for(column <- matrix.shape(1){
        //+1 to (row,column) if word occurs at phrase, phrasePosition indicated by window_n.
    }
}
val finalmatrix = matrix.T.dot(matrix) // to get co-occurrence matrix

Пока все хорошо ...

После этого мне нужно интегрировать данные в существующий конвейер в Spark и использовать эту реализацию pca и т. Д., Поэтому мне нужно создать DataFrame или, по крайней мере, RDD. Если бы я знал количество слов и / или окон заранее, я мог бы сделать что-то вроде:

case class Row(window : String, word_1 : Double, word_2 : Double, ...etc)

val dfSeq = ArrayBuffer[Row]()
for (row <- matrix.shape(0)){
    dfSeq += Row(windows(row),matrix.get(NDArrayIndex.point(row), NDArrayIndex.all()))
}
sc.parallelize(dfSeq).toDF("window","word_1","word_2",...etc)

но количество окон и слов определяется во время выполнения. Я ищу WindowsxWords org.apache.spark.sql.DataFrame в качестве вывода, ввод WindowsxWords org.nd4j.linalg.api.ndarray.INDArray

Заранее благодарим за любую помощь, которую вы можете предложить.

1 Ответ

0 голосов
/ 20 апреля 2019

Хорошо, так что после нескольких дней работы выглядит простой ответ: его нет.На самом деле, похоже, что пытаться использовать Nd4j в этом контексте вообще - плохая идея по нескольким причинам:

  1. (действительно) трудно получить данные изродной INDArray формат, как только вы его вставите.
  2. Даже используя что-то вроде guava , метод .data () переносит все в кучу , что приведет кбыстро становятся дорогими.
  3. Вы столкнулись с необходимостью скомпилировать jar-файл сборки или использовать hdfs и т. д. для работы с самой библиотекой.

Я также рассмотрел вопрос об использовании Breeze , который может фактически обеспечить жизнеспособное решение, но несет некоторые из тех же проблем, и не может использоваться в распределенных структурах данных.

К сожалению, использование нативных типов данных Spark / Scala, хотя и проще, если вы знаете, как, - для кого-то вроде меня, пришедшего с Python + хотя бы с numpy + панда, - мучительно запутанно и безобразно.

Тем не менее, я успешно реализовал это решение:

import org.apache.spark.mllib.linalg.{Vectors,Vector,Matrix,DenseMatrix,DenseVector}
import org.apache.spark.mllib.linalg.distributed.RowMatrix

//first make a pseudo-matrix from Scala Array[Double]:
var rowSeq = Seq.fill(windows.length)(Array.fill(words.length)(0d))

//iterate through 'rows' and 'columns' to fill it:
for (row 0 until windows.length){
    for (column 0 until words.length){
        // rowSeq(row)(column) += 1 if word occurs at phrase, phrasePosition indicated by window_n.
    }
}

//create Spark DenseMatrix
val rows : Array[Double] = rowSeq.transpose.flatten.toArray
val matrix = new DenseMatrix(windows.length,words.length,rows)

Одна из основных операций, для которой мне понадобился Nd4J, была matrix.T.dot(matrix), но оказалось, что вы не можете умножить 2 матрицы типа org.apache.spark.mllib.linalg.DenseMatrix вместеодин из них (A) должен быть org.apache.spark.mllib.linalg.distributed.RowMatrix и - как вы уже догадались - вы не можете позвонить matrix.transpose() на RowMatrix, только на DenseMatrix!Поскольку это не имеет никакого отношения к вопросу, я оставлю эту часть, за исключением объяснения, что результатом этого шага является RowMatrix.Также следует отдать здесь и здесь за последнюю часть решения:

val rowMatrix : [RowMatrix] = transposeAndDotDenseMatrix(matrix)

// get DataFrame from RowMatrix via DenseMatrix
val newdense = new DenseMatrix(rowMatrix.numRows().toInt,rowMatrix.numCols().toInt,rowMatrix.rows.collect.flatMap(x => x.toArray)) // the call to collect() here is undesirable...
val matrixRows = newdense.rowIter.toSeq.map(_.toArray)
val df = spark.sparkContext.parallelize(matrixRows).toDF("Rows")

// then separate columns:
val df2 = (0 until words.length).foldLeft(df)((df, num) => 
df.withColumn(words(num), $"Rows".getItem(num)))
.drop("Rows")

Хотелось бы услышать улучшения и предложения по этому вопросу, спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...