Я разрабатываю приложение Spark (v 2.4.3) для обработки ежедневного пакета данных. Я пытаюсь использовать потоки данных Amazon Kinesis для предоставления входных значений. Так как я не нашел, как использовать Kinesis в качестве источника данных для Spark Batch, я получил следующий обходной путь для использования Spark Streaming (как предложено в Killing spark streaming job, когда нет действия ), чтобы Кластер EMR со Spark может работать только в течение небольшого периода:
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.TrimHorizon
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
object TestKinesisApp {
def main(args: Array[String]): Unit = {
val appName = "TestKinesisApp"
val batchInterval = Milliseconds(2000)
val kinesisCheckpointInterval = batchInterval
val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"
val regionName = "us-east-1"
val streamName = "arundhaj-stream"
val conf = new SparkConf().setAppName(appName)
val ssc = new StreamingContext(conf, batchInterval)
@volatile var counter = 0
val kinesisStream = KinesisInputDStream.builder
.streamingContext(ssc)
.endpointUrl(endpointUrl)
.regionName(regionName)
.streamName(streamName)
.initialPosition(new TrimHorizon())
.checkpointAppName(appName)
.checkpointInterval(kinesisCheckpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
kinesisStream.foreachRDD { rdd =>
val fetched = rdd.collect()
if (fetched.size == 0) {
println(s"Fetched data: size = ${fetched.size}")
counter = counter + 1
if (counter == 20) {
// when stopGracefully=true,the job hangs with "INFO scheduler.JobGenerator: Stopped JobGenerator"
ssc.stop(true, false)
}
} else {
println(s"Fetched data: size = ${fetched.size}")
fetched.foreach(x => println("Fetched data: record - " + new String(x)))
counter = 0
// here is some business logic
}
}
ssc.start()
ssc.awaitTermination()
}
}
Моя проблема по какой-то странной причине: данные не приходят из Kineses во время первых 15 вызовов foreachRDD (хотя новые данные находятся в Kinesis, а последний имеет 1 осколок), поэтому я вынужден использовать счетчик и проверить, не будут ли получены данные для достаточно большого количества вызовов. Есть ли лучший способ сделать это?
Заранее спасибо.