разделение с помощью искры разрывает цепочку отложенных вычислений и вызывает ошибку, которую я не могу уловить - PullRequest
0 голосов
/ 22 марта 2019

При выполнении перераспределения искра разрывает цепочку ленивых вычислений и вызывает ошибку, которую я не могу контролировать / ловить.

//simulation of reading a stream from s3
def readFromS3(partition: Int) : Iterator[(Int, String)] = {
    Iterator.tabulate(3){idx => 
        // simulate an error only on partition 3 record 2
        (idx, if(partition == 3 && idx == 2) throw new RuntimeException("error") else s"elem $idx on partition $partition" )
    }    
}

val rdd = sc.parallelize(Seq(1,2,3,4))
            .mapPartitionsWithIndex((partitionIndex, iter) => readFromS3(partitionIndex))

// I can do whatever I want here

//this is what triggers the evaluation of the iterator 
val partitionedRdd = rdd.partitionBy(new HashPartitioner(2))

// I can do whatever I want here

//desperately trying to catch the exception 
partitionedRdd.foreachPartition{ iter => 
    try{
        iter.foreach(println)
    }catch{
        case _ => println("error caught")
    }
}

Прежде чем комментировать, имейте в виду, что:

  1. Это слишком упрощение моего реального приложения
  2. Я знаючтение из s3 может быть сделано по-другому, и что я должен использовать sc.textFile.Я не могу это контролировать, я не могу это изменить.
  3. Я понимаю, в чем проблема: при разбиении spark прерывает ленивую цепочку-оценку и вызывает ошибку.Я должен это сделать!
  4. Я не утверждаю, что в искре есть ошибка, спарк должен оценить записи для перетасовки
  5. Я могу делать только то, что хочу:
    • между чтением из s3 и разбиением
    • после разбиения
  6. Я могу написать свой собственный разделитель

Учитываяограничения, упомянутые выше, могу ли я обойти это?Есть ли решение?

1 Ответ

0 голосов
/ 23 марта 2019

Единственное решение, которое я смог найти, было иметь EvaluateAheadIterator (тот, который оценивает заголовок буфера перед вызовом iterator.next)

import scala.collection.AbstractIterator
import scala.util.control.NonFatal

class EvalAheadIterator[+A](iter : Iterator[A]) extends AbstractIterator[A] {

  private val bufferedIter  : BufferedIterator[A] = iter.buffered

  override def hasNext: Boolean =
    if(bufferedIter.hasNext){
      try{
          bufferedIter.head //evaluate the head and trigger potential exceptions
          true
      }catch{
          case NonFatal(e) =>
            println("caught exception ahead of time")
            false
      }
    }else{
        false
    }


  override def next() : A = bufferedIter.next()
}

Теперь мы должны применить EvalAheadIterator в mapPartition:

//simulation of reading a stream from s3
def readFromS3(partition: Int) : Iterator[(Int, String)] = {
    Iterator.tabulate(3){idx => 
        // simulate an error only on partition 3 record 2
        (idx, if(partition == 3 && idx == 2) throw new RuntimeException("error") else s"elem $idx on partition $partition" )
    }    
}

val rdd = sc.parallelize(Seq(1,2,3,4))
            .mapPartitionsWithIndex((partitionIndex, iter) => readFromS3(partitionIndex))
            .mapPartitions{iter => new EvalAheadIterator(iter)}

// I can do whatever I want here

//this is what triggers the evaluation of the iterator 
val partitionedRdd = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))

// I can do whatever I want here

//desperately trying to catch the exception 
partitionedRdd.foreachPartition{ iter => 
    try{
        iter.foreach(println)
    }catch{
        case _ => println("error caught")
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...