В следующем коде гарантированно выполняется цикл BLOCK 2
только после того, как все задачи исполнителя, порожденные BLOCK 1
, завершены, или возможно ли его запустить, пока некоторые из задач исполнителя все еще выполняются?
Если возможно одновременное выполнение двух блоков, каков наилучший способ предотвратить это?Мне нужно обработать содержимое аккумулятора, но только после того, как все исполнители закончили.
При запуске с главным URL-адресом local[4]
, как показано, похоже, что BLOCK 2
ожидает BLOCK 1
дляконец, однако я вижу ошибки при работе с URL-адресом yarn
, которые указывают на то, что BLOCK 2
выполняется одновременно с задачами исполнителя в BLOCK 1
object Main {
def main(args: Array[String]) {
val sparkContext = SparkSession.builder.master("local[4]").getOrCreate().sparkContext
val myRecordAccumulator = sparkContext.collectionAccumulator[MyRecord]
// BLOCK 1
sparkContext.binaryFiles("/my/files/here").foreach(_ => {
for(i <- 1 to 100000) {
val record = buildRecord()
myRecordAccumulator.add(record)
}
})
// BLOCK 2
myRecordAccumulator.value.asScala.foreach((record: MyRecord) => {
processIt(record)
})
}
}