Мой контекст таков, что у меня есть специальный приемник искры, который получает поток данных от конечной точки http.Точка httpend обновляется каждые 30 секунд с новыми данными.Так что моему приложению потоковой передачи не имеет смысла агрегировать все данные за 30 секунд, поскольку это, очевидно, приводит к дублированию данных (когда я сохраняю dstream в виде файла, каждый файл детали, представляющий rdd, в точности совпадает)).
Чтобы избежать процесса дедупликации, я хочу 5-секундный фрагмент этого окна.Я хочу использовать функцию слайса в API DStream.Есть два способа использования этой функции: 1. slice (fromTime: Time, toTime: Time) 2. slice (интервал: Interval)
Хотя второй параметр является открытым методом, класс Interval является закрытым.Я поднял вопрос о джира спарк, но это еще одна проблема (https://issues.apache.org/jira/browse/SPARK-27206)
Мой вопрос относится к первому варианту. Я делаю следующее
val sparkSession = getSparkSession(APP_NAME)
val batchInterval:Duration = Durations.seconds(30)
val windowDuration:Duration = Durations.seconds(60)
val slideDuration:Duration = Durations.seconds(30)
val ssc = new StreamingContext(sparkSession.sparkContext, batchInterval)
ssc.checkpoint("some path")
val memTotal:ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver("http endpoint", true))
val dstreamMemTotal = memTotal.window(windowDuration, slideDuration)
Все хорошо доэтот момент. Однако, когда я добавляю функцию слайда, такую как следующее
val a = dstreamMemTotal.slice(currentTime, currentTime.+(Durations.seconds(5)))
, я получаю следующую ошибку.
exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.dstream.WindowedDStream@62315f22 has not been initialized
at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2.apply(DStream.scala:880)
at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2.apply(DStream.scala:878)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:699)
at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:265)
at org.apache.spark.streaming.dstream.DStream.slice(DStream.scala:878)
Любые указатели, пожалуйста?