Обрабатывать необработанные файлы S3 с помощью Spark Streaming Checkpoint - PullRequest
0 голосов
/ 22 сентября 2018

Я пытаюсь обработать файлы на S3 через Spark Streaming на EMR.Я включил контрольную точку на S3 (как стартер).У меня вопрос: если кластер выходит из строя или поврежден, и я поднимаю новый, как мне собрать все файлы, которые накопились на моем S3-контейнере во время простоя?

В настоящее время он начинает обрабатывать файлыпосле того, как кластер подходит.Я пытался использовать контрольную точку в своем коде, но я не вижу, чтобы предыдущие файлы были подобраны.Есть ли правильный способ сделать это?

Пример кода, который я использую в настоящее время:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName("SimpleApplication").setMaster("local[*]")
val sc1 = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc1,Seconds(10))
ssc.checkpoint("s3n://test-bucket/spark-streaming/metadata/")
val lines = ssc.textFileStream("s3n://test-bucket/raw-data/")

val a = lines.count()
a.saveAsTextFiles("s3n://rachit-nvr-del/spark-streaming/data/")
...