Моя программа spark отправляет задачи в пул потоков в очереди синхронной блокировки. Начало следующей задачи зависит от результата предыдущей задачи. Одна базовая задача состоит из этапа карты и этапа reduByKey.
Этап карты преобразует массив rdd в пару ключ-значение rdd. Этап reduByKey выполняет задачу сопоставления в соответствии с парой ключ-значение. Если ключ двух строк одинаков, но первый бит значения отличается, MyException будет выброшено, и исключение будет перехвачено в текущий поток, возвращающий значение и заканчивающий поток и отправляющий следующую задачу; если lowerByKey обходит все данные и не находит подходящую строку, он не будет генерировать MyException. Я использую счетчик для запуска двух операций преобразования, описанных выше.
Согласно моему предыдущему пониманию разбиения на этапе искры, операция reduBykey должна ждать, пока операция карты не будет выполнена, но я вижу в веб-интерфейсе, что MyException также будет генерироваться на некоторых этапах карты. Итак, я хочу знать:
- Этап карты и этап reduByKey выполняются синхронно, или существует параллельная ситуация?
- Мой код доступен в этом методе, будет ли ошибка? Например, следующий поток обнаружил исключение MyException, выброшенное в предыдущем задании.
//source rdd looks like this
val seq1=Seq((1,2,3),(4,5,6),(7,8,9),.....)
val rdd1=sc.parallelize(seq1)
//Single compute thread
class MyException extends Exception
class CalThread(lastResult:Boolean) extends Callable[Boolean]{
override def call(): Boolean = {
try{
val rdd2=rdd1.map{x=>(x._1,(x._2,x._3))}
val rdd3=rdd2.reduceByKey{
(a,b)=>{
if(a._1!=b._1)
throw new MyException
else
a
}
}
println(rdd3.count)
false
}catch{
case e:Exception=>{
true
}
}
}
}
//how to submit task
val exec =new ThreadPoolExecutor(1,1,0
,TimeUnit.SECONDS,new LinkedBlockingQueue[Runnable]())
var f:Future[Boolean]=null
var firstCal=true
while (flag){
if(firstCal){
f = exec.submit(new CalThread(false))
firstCal=false
}
else{
f = exec.submit(new CalThread(f.get()))
}
}
- Этап карты и этап reduByKey выполняются синхронно, или существует параллельная ситуация?
- Мой код доступен в этом методе, будет ли ошибка? Например, следующий поток обнаружил исключение MyException, выброшенное в предыдущем задании.