Поток выполнения при использовании ForEach в Spark / Scala - PullRequest
1 голос
/ 04 октября 2019

У меня странная проблема с потоком выполнения в кластере.

Вызов метода A - Метод B, который в вызове FOREACH - Метод C

Поток выполнения должен быть

Method A --> Method B --> Method C 

, но работает так:

1) Method A --> Method B (skips Method C) and continues to rest of Method B.
2) Method C is executed later separately. 

Поскольку поток не в правильном потоке, accum1.value в методе B отображается как blank/null.

**CLASS A::METHOD A:**

object TakeDFs {

    def takeDFs(df: DataFrame): Unit = {
        println("---------------- takeNettedDFs::START ---------------- ")

        for(i <- 0 until bySecurityArray.length) {
            allocProcessDF = bySecurityArray(i).toDF()
            ....

            //WORKS
            AllocOneProcess.getAllocOneDFs(allocProcessDF)

            }
        println("---------------- takeNettedDFs::END ---------------- ")

    }
}

**CLASS B::METHOD B:**

object AllocOneProcess {

    def getAllocOneDFs(df: DataFrame): Unit = {
        println("---------------- getAllocOneDFs::START ---------------- ")

        df.coalesce(1).sort($"PRIORITY" asc).foreach( {
        row => AllocOneTest.allocProcessTest(row)
        })

        println("------------- getAllocOneDFs::accum1.value -------------" + accum1.value)

        println("---------------- getAllocOneDFs::END ---------------- ")

    }
}

**CLASS C::METHOD C:**

object AllocOneTest {

    def allocProcessTest(row: Row): Unit =  {
        println("---------------- AllocOneTest::allocProcessTest::START ---------------- ")

        accum1.add(RegRptPilotConstants.PairProcessCaseClass(row(0).asInstanceOf[String], row(1).asInstanceOf[String], row(2).asInstanceOf[String]))


        println("---------------- AllocationOneTest::allocProcessTest::END ---------------- ")

    }
}

**CLASS D::**

object RegRptPilotConstants {
    var pairedOneSeq = Seq[PairProcessCaseClass]()
    val accum1 = new ProcessAccumulator[ProcessCaseClass]()

}

1 Ответ

0 голосов
/ 04 октября 2019

Для приведенного выше кода foreach - это действие, которое запускает Spark DAG для выполнения всего потока. Действие, т.е. foreach в вышеприведенном сценарии, выполняется / вызывается на каждом из разделов одновременно параллельно, таким образом, вызывая методы, которые, наконец, находятся внутри foreach.

Поток: метод A -> метод B -> Метод C, Метод C, Метод C ...

df.coalesce(1).sort($"PRIORITY" asc).foreach( {
    row => AllocOneTest.allocProcessTest(row)
})

Ссылка: https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

...