Не в состоянии выполнять преобразования на входе Dines Kinesis - PullRequest
0 голосов
/ 10 января 2019

Я пытаюсь использовать поток кинезиса и выполнять преобразования на нем с помощью потоковой искры, но не получаю результатов.

Теперь приведенный ниже код в условии if работает нормально. когда я получаю ненулевые записи в потоке, он печатает счет. Если я использую logger.warn ("") в предложении if в случае операторов печати, я просто продолжаю получать строки ниже в моем журнале, а не фактическое количество:

INFO BlockManagerInfo: добавлен ввод-0-1547121901568 в памяти на ip-10-225-172-215.ec2.internal: 37820

Не знаю, почему spark не может выполнить count () при использовании логгера.

Также до сих пор я могу сохранять str rdd в формате hdf. Мой поток выводит записи в формате JSON.

Теперь, если я попытаюсь проанализировать мой поток как json и загрузить, как показано ниже, снова я получаю только информацию ниже в моем журнале:

INFO BlockManagerInfo: добавлен ввод-0-1547121901577 в памяти на ip-10-225-172-70.ec2.internal: 39438 (размер: 6,4 МБ, бесплатно: 1238,7 МБ) ИНФОРМАЦИЯ BlockManagerInfo: добавлен input-0-1547121901578 в памяти на ip-10-225-172-215.ec2.internal: 37820 (размер: 5,1 МБ, бесплатно: 1233,6 МБ) ИНФОРМАЦИЯ BlockManagerInfo: добавлен input-0-1547121901578 в памяти на ip-10-225-172-70.ec2.internal: 39438 (размер: 5,1 МБ, бесплатно: 1233,6 МБ) ИНФОРМАЦИЯ JobScheduler: добавлены задания на время 1547121755000 мс

Вот код, который я использую:

object KinesisWatch {


  val logger: Logger = Logger.getLogger("##### Kinesis-Logs #####")
  Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
  Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.INFO)
  logger.setLevel(Level.INFO)

  // Kinesis, AWS and Spark streaming configurations
  val kinesisConfig = Map(
    ("appName", "spark-kinesis"),
    ("streamName", "xxxx"),
    ("endpointUrl", "https://kinesis.us-east-1.amazonaws.com"),
    ("numStreams", "2"),
    ("regionName", "us-east-1"),
    ("awsAccessKey", "xxxxx"),
    ("awsSecretKey", "xxxxxx"),
    ("sparkStreamingBatchInterval", "5"),
    ("kinesisCheckpointInterval", "5"))

  logger.warn("Setting up SparkConfig and StreamingContext")
  val sparkSession = new SparkSessionWrapper().sparkSession
  val sc = sparkSession.sparkContext
  val sqlContext = sparkSession.sqlContext
  val ssc = new StreamingContext(sc, Seconds(kinesisConfig("sparkStreamingBatchInterval").toInt))

  val awsCredentials = SparkAWSCredentials.builder.basicCredentials(kinesisConfig("awsAccessKey"), kinesisConfig("awsSecretKey"))

  val kinesisStreams = (0 until kinesisConfig("numStreams").toInt).map { i =>
    KinesisInputDStream.builder
      .streamingContext(ssc)
      .endpointUrl(kinesisConfig("endpointUrl"))
      .regionName(kinesisConfig("regionName"))
      .streamName(kinesisConfig("streamName"))
      .initialPositionInStream(InitialPositionInStream.LATEST)
      .checkpointAppName(kinesisConfig("appName"))
      .checkpointInterval(Seconds(kinesisConfig("kinesisCheckpointInterval").toInt))
      .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
      .kinesisCredentials(awsCredentials.build())
      .build()
  }
  val unionStreams = ssc.union(kinesisStreams)

  unionStreams.foreachRDD(rdd => {
    if
    (rdd.count()>0){
      println("New records found\nmetrics count in the batch: %s".format(rdd.count()))
      println("performing transformations")
      val str = rdd.map(str => new String(str))
      str.saveAsTextFile("/home/aman/EDA_BP_Kinesis")    
    }
    else
      println("No new record found")
  })    
  ssc.start()
  ssc.awaitTermination()

}

Не работает:

 unionStreams.foreachRDD(rdd => {
    if
    (rdd.count()>0){
      println("New records found\nmetrics count in the batch: %s".format(rdd.count()))
      println("performing transformations")
      val str = rdd.map(str => new String(str))
      str.saveAsTextFile("/home/aman/EDA_BP_Kinesis")

      import sparkSession.implicits._
      val records = str.toString()
      val df = sparkSession.read.json(Seq(records).toDS)
      df.count()
      df.show()
    }
    else
      println("No new record found")
  })

Я указал количество шардов 2, потоков 2 и ядер 4.

Ответы [ 2 ]

0 голосов
/ 15 января 2019

Попробуйте создать отдельную сессию Spark в foreach. Надеюсь, что это решит проблему тупика.

 unionStreams.foreachRDD(rdd => {
    if (rdd.count() > 0) {
      println("New records found\nmetrics count in the batch: %s".format(rdd.count()))
      println("performing transformations")
      val str = rdd.map(str => new String(str))
      str.saveAsTextFile("/home/aman/EDA_BP_Kinesis")
      val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()

      import spark.implicits._
      val records = str.toString()
      val df = spark.read.json(Seq(records).toDS)
      df.count()
      df.show()
    } else
      println("No new record found")})
0 голосов
/ 10 января 2019

Можете ли вы попытаться увеличить количество работников? Я полагаю, что в случае потоковой передачи Spark с Amazon Kinesis каждый осколок требует отдельного работника. Поэтому, если у вас недостаточно работников, ваш код не работает.

Почему запись в hdfs успешна? Я предполагаю, что операция записи в файл не требует перемешивания, поэтому каждый работник, читающий осколок, может справиться с этим самостоятельно. Но для операции count() это потребует некоторой случайности, поэтому потребуется больше рабочих.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...