Искровой поток сохраните Base64 RDD до json на S3 - PullRequest
0 голосов
/ 07 февраля 2020

Приведенное ниже приложение scala не может сохранить rdd в формате json на S3

. У меня есть: -

  1. поток кинезиса, в котором сложные объекты помещены в поток , К этому объекту был применен JSON .stringify () перед его помещением в поток как часть метода Kinesis PutRecord.
  2. Задание Spark Stream scala считывает эти элементы из потока,

Кажется, я не могу сохранить запись rdd, которая поступает из потока, в файл json на S3 ведро.

В коде я попытался преобразовать RDD [байты] в RDD [String], а затем загрузить с помощью spark.read. json, но не повезло. Я пробовал различные другие комбинации и не могу вывести на S3 в необработанном формате.

import org.apache.spark._
import org.apache.spark.sql._
import java.util.Base64
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import org.apache.spark.streaming.kinesis.KinesisInputDStream

import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest

object ScalaStream {
  def main(args: Array[String]): Unit = {  
        val appName = "ScalaStreamExample"
        val batchInterval = Milliseconds(2000)
        val outPath = "s3://xxx-xx--xxx/xxxx/"

        val spark = SparkSession
            .builder()
            .appName(appName)
            .getOrCreate()

        val sparkContext = spark.sparkContext
        val streamingContext = new StreamingContext(sparkContext, batchInterval)

        // Populate the appropriate variables from the given args
        val checkpointAppName = "xxx-xx-xx--xx"
        val streamName = "cc-cc-c--c--cc"
        val endpointUrl = "https://kinesis.xxx-xx-xx.amazonaws.com"
        val regionName = "cc-xxxx-xxx"
        val initialPosition = new Latest()
        val checkpointInterval = batchInterval
        val storageLevel = StorageLevel.MEMORY_AND_DISK_2

        val kinesisStream = KinesisInputDStream.builder
         .streamingContext(streamingContext)
         .endpointUrl(endpointUrl)
         .regionName(regionName)
         .streamName(streamName)
         .initialPosition(initialPosition)
         .checkpointAppName(checkpointAppName)
         .checkpointInterval(checkpointInterval)
         .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
         .build()

        kinesisStream.foreachRDD { rdd =>
            if (!rdd.isEmpty()){
                //**************** .  <---------------
                // This is where i'm trying to save the raw json object to s3 as json file
                // tried various combinations here but no luck. 
                val dataFrame = rdd.map(record=>new String(record)) // convert bytes to string
                dataFrame.write.mode(SaveMode.Append).json(outPath + "/" + rdd.id.toString())
                //**************** <----------------
            }
        }

        // Start the streaming context and await termination
        streamingContext.start()
        streamingContext.awaitTermination()
    }
}

Есть идеи, что мне не хватает?

1 Ответ

1 голос
/ 07 февраля 2020

Так что это была полная красная сельдь, почему она не работала. Оказывается, это был конфликт версии scala с тем, что доступно в EMR.

Многие аналогичные вопросы задавались в SO, в которой предполагалось, что это может быть проблемой, но хотя списки документации искры 2.12.4 совместимы с искрой 2.4.4, экземпляр EMR не поддерживает scala версию 2.12. 4. Итак, я обновил свой build.sbt и развернул сценарий с

build.sbt:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.8"

ibraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "2.4.4"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "2.4.4"
libraryDependencies += "org.apache.spark" % "spark-streaming-kinesis-asl_2.12" % "2.4.4"

до:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.12"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.4"

deploy. sh

aws emr add-steps --cluster-id j-xxxxx --steps Type=spark,Name=ScalaStream,Args=[\
--class,"ScalaStream",\
--deploy-mode,cluster,\
--master,yarn,\
--packages,\'org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.4\',\
--conf,spark.yarn.submit.waitAppCompletion=false,\
--conf,yarn.log-aggregation-enable=true,\
--conf,spark.dynamicAllocation.enabled=true,\
--conf,spark.cores.max=4,\
--conf,spark.network.timeout=300,\
s3://ccc.xxxx/simple-project_2.11-1.0.jar\
],ActionOnFailure=CONTINUE
...