SparkException: Попытка использовать BlockRDD [247] в receiveStream в после того, как его блоки были удалены, ошибка при использовании пользовательского получателя - PullRequest
0 голосов
/ 28 сентября 2018

У меня есть простой приемник, который генерирует фиктивные данные и используется в контексте потоковой передачи.Фрагмент кода ниже.

Время от времени задание выдает следующее исключение.Это вызвано некоторой гонкой между записью и чтением данных?Что такое хороший способ справиться с этим?

org.apache.spark.SparkException: Попытка использовать BlockRDD [247] в receiveStream в после того, как его блоки были удалены!

class DummyStreamingSource extends Receiver[(Int,Long)](StorageLevel.MEMORY_AND_DISK_2) {

  /** Start the thread that receives data over a connection */
  def onStart() {
    new Thread("Test Source") { override def run() { receive() } }.start()
  }

  def onStop() {  }

  /** Periodically generate a random number and the timestamp */
  private def receive() {

    while(!isStopped()) {     
     store(Iterator((Random.nextInt(10), System.currentTimeMillis)))  
      Thread.sleep(500)
    }
  }
}

...

streamingContext.receiverStream(new DummyStreamingSource())
      .foreachRDD((rdd, time)=>{
...

}
...