Давайте начнем с анализа того, что вы делаете. Вы собираете данные dfm
для водителя. Затем для каждого элемента вы собираете данные из dfn
, преобразуете их и вычисляете оценку для каждой пары элементов.
Это проблематично во многих отношениях. Во-первых, даже без учета параллельных вычислений преобразования элементов dfn
выполняются столько же, сколько dfm
элементов. Кроме того, вы собираете данные dfn
для каждой строки dfm
. Это много сетевых коммуникаций (между водителем и исполнителями).
Если вы хотите использовать spark для распараллеливания вычислений, вам нужно использовать API (RDD, SQL или наборы данных). Похоже, вы хотите использовать RDD для выполнения декартового произведения (это O (N * M), поэтому будьте осторожны, это может занять некоторое время).
Давайте начнем с преобразования данных перед декартовым произведением, чтобы избежать их выполнения более одного раза для каждого элемента. Также, для ясности, давайте определим класс case, содержащий ваши данные, и функцию, которая преобразует ваши фреймы данных в RDD этого класса case.
case class X(id : String, fname : String, lname : String, lssn : String)
def toRDDofX(df : DataFrame) = {
df.rdd.map(row => {
// using pattern matching to convert the array to the case class X
row.mkString(",").split(",") match {
case Array(a, b, c, d) => X(a, b, c, d)
}
})
}
Затем я использую filter
, чтобы сохранить только кортежи, чей счет превышает .95
, но вы можете использовать map
, foreach
... в зависимости от того, что вы собираетесь делать.
val rddn = toRDDofX(dfn)
val rddm = toRDDofX(dfm)
rddn.cartesian(rddm).filter{ case (xn, xm) => {
val fNameArray = Array(xm.fname,xn.fname)
val lNameArray = Array(xm.lname,xn.lname)
val ssnArray = Array(xm.lssn,xn.lssn)
val fnamescore = Main.resultSet(fNameArray)
val lnamescore = Main.resultSet(lNameArray)
val ssnscore = Main.resultSet(ssnArray)
val overallscore = (fnamescore +lnamescore +ssnscore) /3
// and then, let's say we filter by score
overallscore > .95
}}