У меня есть простой приемник, который генерирует фиктивные данные и используется в контексте потоковой передачи.Фрагмент кода ниже.
Время от времени задание выдает следующее исключение.Это вызвано некоторой гонкой между записью и чтением данных?Что такое хороший способ справиться с этим?
org.apache.spark.SparkException: Попытка использовать BlockRDD [247] в receiveStream в после того, как его блоки были удалены!
class DummyStreamingSource extends Receiver[(Int,Long)](StorageLevel.MEMORY_AND_DISK_2) {
/** Start the thread that receives data over a connection */
def onStart() {
new Thread("Test Source") { override def run() { receive() } }.start()
}
def onStop() { }
/** Periodically generate a random number and the timestamp */
private def receive() {
while(!isStopped()) {
store(Iterator((Random.nextInt(10), System.currentTimeMillis)))
Thread.sleep(500)
}
}
}
...
streamingContext.receiverStream(new DummyStreamingSource())
.foreachRDD((rdd, time)=>{
...
}