У меня странная проблема с потоком выполнения в кластере.
Вызов метода A - Метод B, который в вызове FOREACH - Метод C
Поток выполнения должен быть
Method A --> Method B --> Method C
, но работает так:
1) Method A --> Method B (skips Method C) and continues to rest of Method B.
2) Method C is executed later separately.
Поскольку поток не в правильном потоке, accum1.value
в методе B отображается как blank/null
.
**CLASS A::METHOD A:**
object TakeDFs {
def takeDFs(df: DataFrame): Unit = {
println("---------------- takeNettedDFs::START ---------------- ")
for(i <- 0 until bySecurityArray.length) {
allocProcessDF = bySecurityArray(i).toDF()
....
//WORKS
AllocOneProcess.getAllocOneDFs(allocProcessDF)
}
println("---------------- takeNettedDFs::END ---------------- ")
}
}
**CLASS B::METHOD B:**
object AllocOneProcess {
def getAllocOneDFs(df: DataFrame): Unit = {
println("---------------- getAllocOneDFs::START ---------------- ")
df.coalesce(1).sort($"PRIORITY" asc).foreach( {
row => AllocOneTest.allocProcessTest(row)
})
println("------------- getAllocOneDFs::accum1.value -------------" + accum1.value)
println("---------------- getAllocOneDFs::END ---------------- ")
}
}
**CLASS C::METHOD C:**
object AllocOneTest {
def allocProcessTest(row: Row): Unit = {
println("---------------- AllocOneTest::allocProcessTest::START ---------------- ")
accum1.add(RegRptPilotConstants.PairProcessCaseClass(row(0).asInstanceOf[String], row(1).asInstanceOf[String], row(2).asInstanceOf[String]))
println("---------------- AllocationOneTest::allocProcessTest::END ---------------- ")
}
}
**CLASS D::**
object RegRptPilotConstants {
var pairedOneSeq = Seq[PairProcessCaseClass]()
val accum1 = new ProcessAccumulator[ProcessCaseClass]()
}