Вопрос об использовании пула потоков для отправки задач по очереди в виде синхронных очередей в spark и перехвата в нем исключения - PullRequest
0 голосов
/ 27 июня 2019

Моя программа spark отправляет задачи в пул потоков в очереди синхронной блокировки. Начало следующей задачи зависит от результата предыдущей задачи. Одна базовая задача состоит из этапа карты и этапа reduByKey.

Этап карты преобразует массив rdd в пару ключ-значение rdd. Этап reduByKey выполняет задачу сопоставления в соответствии с парой ключ-значение. Если ключ двух строк одинаков, но первый бит значения отличается, MyException будет выброшено, и исключение будет перехвачено в текущий поток, возвращающий значение и заканчивающий поток и отправляющий следующую задачу; если lowerByKey обходит все данные и не находит подходящую строку, он не будет генерировать MyException. Я использую счетчик для запуска двух операций преобразования, описанных выше.

Согласно моему предыдущему пониманию разбиения на этапе искры, операция reduBykey должна ждать, пока операция карты не будет выполнена, но я вижу в веб-интерфейсе, что MyException также будет генерироваться на некоторых этапах карты. Итак, я хочу знать:

  1. Этап карты и этап reduByKey выполняются синхронно, или существует параллельная ситуация?
  2. Мой код доступен в этом методе, будет ли ошибка? Например, следующий поток обнаружил исключение MyException, выброшенное в предыдущем задании.
//source rdd looks like this
val seq1=Seq((1,2,3),(4,5,6),(7,8,9),.....)
val rdd1=sc.parallelize(seq1)

//Single compute thread
class MyException extends Exception
class CalThread(lastResult:Boolean) extends Callable[Boolean]{
  override def call(): Boolean = {
    try{
      val rdd2=rdd1.map{x=>(x._1,(x._2,x._3))}
      val rdd3=rdd2.reduceByKey{
        (a,b)=>{
          if(a._1!=b._1)
           throw new MyException
          else
           a
        }
      }
      println(rdd3.count)
      false
    }catch{
      case e:Exception=>{
        true
      }
    }

  }
}

//how to submit task
val exec =new ThreadPoolExecutor(1,1,0
      ,TimeUnit.SECONDS,new LinkedBlockingQueue[Runnable]())
var f:Future[Boolean]=null
var firstCal=true
while (flag){
  if(firstCal){
    f = exec.submit(new CalThread(false))
    firstCal=false
  }
  else{
    f = exec.submit(new CalThread(f.get()))
  }
}
  1. Этап карты и этап reduByKey выполняются синхронно, или существует параллельная ситуация?
  2. Мой код доступен в этом методе, будет ли ошибка? Например, следующий поток обнаружил исключение MyException, выброшенное в предыдущем задании.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...