Попытка распараллелить вложенный цикл в Scala - PullRequest
0 голосов
/ 27 мая 2019

Я сравниваю 2 кадра данных в scala / spark, используя вложенный цикл и внешний jar.

for (nrow <- dfm.rdd.collect) {   
  var mid = nrow.mkString(",").split(",")(0)
  var mfname = nrow.mkString(",").split(",")(1)
  var mlname = nrow.mkString(",").split(",")(2)  
  var mlssn = nrow.mkString(",").split(",")(3)  

  for (drow <- dfn.rdd.collect) {
    var nid = drow.mkString(",").split(",")(0)
    var nfname = drow.mkString(",").split(",")(1)
    var nlname = drow.mkString(",").split(",")(2)  
    var nlssn = drow.mkString(",").split(",")(3)  

    val fNameArray = Array(mfname,nfname)
    val lNameArray = Array (mlname,nlname)
    val ssnArray = Array (mlssn,nlssn)

    val fnamescore = Main.resultSet(fNameArray)
    val lnamescore = Main.resultSet(lNameArray)
    val ssnscore =  Main.resultSet(ssnArray)

    val overallscore = (fnamescore +lnamescore +ssnscore) /3

    if(overallscore >= .95) {
       println("MeditechID:".concat(mid)
         .concat(" MeditechFname:").concat(mfname)
         .concat(" MeditechLname:").concat(mlname)
         .concat(" MeditechSSN:").concat(mlssn)
         .concat(" NextGenID:").concat(nid)
         .concat(" NextGenFname:").concat(nfname)
         .concat(" NextGenLname:").concat(nlname)
         .concat(" NextGenSSN:").concat(nlssn)
         .concat(" FnameScore:").concat(fnamescore.toString)
         .concat(" LNameScore:").concat(lnamescore.toString)
         .concat(" SSNScore:").concat(ssnscore.toString)
         .concat(" OverallScore:").concat(overallscore.toString))
    }
  }
}

То, что я надеюсь сделать, это добавить некоторый параллелизм во внешний цикл, чтобы я мог создать пул потоков из 5 и извлечь 5 записей из коллекции externalloop и сравнить их с коллекцией внутреннего цикла, а не делаю это поочередно. Таким образом, результатом будет то, что я могу указать количество потоков, иметь 5 записей из обработки коллекции externalloop в любой момент времени для коллекции во внутреннем цикле. Как бы я поступил так?

Ответы [ 2 ]

3 голосов
/ 27 мая 2019

Давайте начнем с анализа того, что вы делаете. Вы собираете данные dfm для водителя. Затем для каждого элемента вы собираете данные из dfn, преобразуете их и вычисляете оценку для каждой пары элементов.

Это проблематично во многих отношениях. Во-первых, даже без учета параллельных вычислений преобразования элементов dfn выполняются столько же, сколько dfm элементов. Кроме того, вы собираете данные dfn для каждой строки dfm. Это много сетевых коммуникаций (между водителем и исполнителями).

Если вы хотите использовать spark для распараллеливания вычислений, вам нужно использовать API (RDD, SQL или наборы данных). Похоже, вы хотите использовать RDD для выполнения декартового произведения (это O (N * M), поэтому будьте осторожны, это может занять некоторое время).

Давайте начнем с преобразования данных перед декартовым произведением, чтобы избежать их выполнения более одного раза для каждого элемента. Также, для ясности, давайте определим класс case, содержащий ваши данные, и функцию, которая преобразует ваши фреймы данных в RDD этого класса case.

case class X(id : String, fname : String, lname : String, lssn : String)
def toRDDofX(df : DataFrame) = {
    df.rdd.map(row => {
        // using pattern matching to convert the array to the case class X
        row.mkString(",").split(",") match {
            case Array(a, b, c, d) => X(a, b, c, d)
        } 
    })
}

Затем я использую filter, чтобы сохранить только кортежи, чей счет превышает .95, но вы можете использовать map, foreach ... в зависимости от того, что вы собираетесь делать.

val rddn = toRDDofX(dfn)
val rddm = toRDDofX(dfm)
rddn.cartesian(rddm).filter{ case (xn, xm) => {
    val fNameArray = Array(xm.fname,xn.fname)
    val lNameArray = Array(xm.lname,xn.lname)
    val ssnArray = Array(xm.lssn,xn.lssn)

    val fnamescore = Main.resultSet(fNameArray)
    val lnamescore = Main.resultSet(lNameArray)
    val ssnscore =  Main.resultSet(ssnArray)

    val overallscore = (fnamescore +lnamescore +ssnscore) /3
    // and then, let's say we filter by score
    overallscore > .95
}} 
1 голос
/ 27 мая 2019

Это неправильный способ итерации по искровому фрейму данных. Основное беспокойство вызывает dfm.rdd.collect. Если размер фрейма данных произвольно большой, вы получите исключение. Это связано с тем, что функция collect по существу переносит все данные в главный узел.

Альтернативным способом будет использование конструкции foreach или map rdd.

dfm.rdd.foreach(x => {
    // your logic
}  

Теперь вы пытаетесь перебрать второй кадр данных здесь. Я боюсь, что это будет невозможно. Элегантный способ - объединить dfm и dfn и выполнить итерацию по результирующему набору данных для вычисления вашей функции.

...