Spark Streaming + Kinesis: в первые несколько микропакетов данные не поступают - PullRequest
0 голосов
/ 23 марта 2020

Я разрабатываю приложение Spark (v 2.4.3) для обработки ежедневного пакета данных. Я пытаюсь использовать потоки данных Amazon Kinesis для предоставления входных значений. Так как я не нашел, как использовать Kinesis в качестве источника данных для Spark Batch, я получил следующий обходной путь для использования Spark Streaming (как предложено в Killing spark streaming job, когда нет действия ), чтобы Кластер EMR со Spark может работать только в течение небольшого периода:

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.TrimHorizon
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

object TestKinesisApp {

  def main(args: Array[String]): Unit = {

    val appName = "TestKinesisApp"
    val batchInterval = Milliseconds(2000)
    val kinesisCheckpointInterval = batchInterval
    val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"
    val regionName = "us-east-1"
    val streamName = "arundhaj-stream"

    val conf = new SparkConf().setAppName(appName)
    val ssc = new StreamingContext(conf, batchInterval)

    @volatile var counter = 0

    val kinesisStream = KinesisInputDStream.builder
      .streamingContext(ssc)
      .endpointUrl(endpointUrl)
      .regionName(regionName)
      .streamName(streamName)
      .initialPosition(new TrimHorizon())
      .checkpointAppName(appName)
      .checkpointInterval(kinesisCheckpointInterval)
      .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
      .build()

    kinesisStream.foreachRDD { rdd =>
      val fetched = rdd.collect()
      if (fetched.size == 0) {
        println(s"Fetched data: size = ${fetched.size}")
        counter = counter + 1
        if (counter == 20) {
          // when stopGracefully=true,the job hangs with "INFO scheduler.JobGenerator: Stopped JobGenerator"
          ssc.stop(true, false)
        }
      } else {
        println(s"Fetched data: size = ${fetched.size}")
        fetched.foreach(x => println("Fetched data: record - " + new String(x)))
        counter = 0
        // here is some business logic
      }
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

Моя проблема по какой-то странной причине: данные не приходят из Kineses во время первых 15 вызовов foreachRDD (хотя новые данные находятся в Kinesis, а последний имеет 1 осколок), поэтому я вынужден использовать счетчик и проверить, не будут ли получены данные для достаточно большого количества вызовов. Есть ли лучший способ сделать это?

Заранее спасибо.

...