Искра QueueStream никогда не исчерпывается - PullRequest
0 голосов
/ 04 января 2019

Озадаченный частью кода, который я позаимствовал из Интернета для исследовательских целей. Это код:

import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable

val spark = ... 

val sc = spark.sparkContext
val ssc = new StreamingContext(spark.sparkContext, Seconds(1)) 

val rddQueue = new mutable.Queue[RDD[Char]]()
val QS = ssc.queueStream(rddQueue) 

QS.foreachRDD(q=> {
   print("Hello") // Queue never exhausted
   if(!q.isEmpty) {
       ... do something
       ... do something
   }
}
)

//ssc.checkpoint("/chkpoint/dir") if unchecked causes Serialization error

ssc.start()
for (c <- 'a' to 'c') {
    rddQueue += ssc.sparkContext.parallelize(List(c))
}
ssc.awaitTermination()

Я просматривал его, чтобы проверить и заметил, что «привет» печатается навсегда:

 HelloHelloHelloHelloHelloHelloHelloHelloHelloHello and so on

Я бы подумал, что queueStream исчерпает себя через 3 итерации.

Итак, что я пропустил?

1 Ответ

0 голосов
/ 04 января 2019

Понял. На самом деле он исчерпан, но цикл продолжается, и поэтому утверждение

 if(!q.isEmpty)

там.

ОК, подумал бы, что он просто остановится или, скорее, не выполнится, но не так. Я вспомнил. Пустой СДР будет получен, если ничего не передано, в зависимости от времени интервала пакета. Оставив для других, как было upvote.

Однако, несмотря на то, что он устаревший, это плохой пример добавления контрольной точки вызывает ошибку сериализации. Оставляя это на благо других.

ssc.checkpoint("/chkpoint/dir")
...