Соедините два Spark DStreams со сложной вложенной структурой - PullRequest
/ 23 ноября 2018

Я реализовал специальный приемник Spark для приема DStreams из http / REST следующим образом

val mem1Total:ReceiverInputDStream[String] = ssc.receiverStream(new CustomReceiver("httpURL1"))
val dstreamMem1:DStream[String] = mem1Total.window(Durations.seconds(30), Durations.seconds(10)) 
val mem2Total:ReceiverInputDStream[String] = ssc.receiverStream(new CustomReceiver("httpURL2"))
val dstreamMem2:DStream[String] = mem2Total.window(Durations.seconds(30), Durations.seconds(10)) 

Каждый поток имеет следующую схему

val schema = StructType(Seq(
            StructField("status", StringType),
            StructField("data", StructType(Seq(
                StructField("resultType", StringType),
                StructField("result", ArrayType(StructType(Array(
                    StructField("metric", StructType(Seq(StructField("application", StringType),
                                                         StructField("component", StringType),
                                                         StructField("instance", StringType)))), 
                    StructField("value", ArrayType(StringType))

Вот то, что я, как далеко яможет пойти, чтобы извлечь функции, которые мне нужны из dstreamMem1.

  dstreamMem1.foreachRDD { rdd =>
  import sparkSession.implicits._
  val df = rdd.toDS()                        
                    .selectExpr("cast (value as string) as myData") 
                    .select(from_json($"myData", schema).as("myDataEvent"))                  
                    .select($"flat.metric.*", $"flat.value".getItem(0).as("value1"), $"flat.value".getItem(1).as("value2"))

Однако я не могу понять, как объединить dstreamMem1 с dstreamMem2, также имея дело со сложной структурой.Я могу сделать операцию объединения на dstreamMem1 и dstreamMem2.Но это не сработает в моем случае, потому что поля «значения» представляют разные вещи в каждом потоке.Любые идеи, пожалуйста?

Редактировать # 1 На основе следующих ресурсов Как создать пользовательский источник данных потоковой передачи? https://github.com/apache/spark/pull/21145 https://github.com/hienluu/structured-streaming-sources/tree/master/streaming-sources/src/main/scala/org/structured_streaming_sources/twitter

У меня естьудалось создать следующий класс для реализации следующего

        class SSPStreamMicroBatchReader(options: DataSourceOptions) extends MicroBatchReader with Logging {

          private val httpURL = options.get(SSPStreamingSource.HTTP_URL).orElse("") //.toString()
          private val numPartitions = options.get(SSPStreamingSource.NUM_PARTITIONS).orElse("5").toInt
          private val queueSize = options.get(SSPStreamingSource.QUEUE_SIZE).orElse("512").toInt
          private val debugLevel = options.get(SSPStreamingSource.DEBUG_LEVEL).orElse("debug").toLowerCase

          private val NO_DATA_OFFSET = SSPOffset(-1)
          private var startOffset: SSPOffset = new SSPOffset(-1)
          private var endOffset: SSPOffset = new SSPOffset(-1)
          private var currentOffset: SSPOffset = new SSPOffset(-1)
          private var lastReturnedOffset: SSPOffset = new SSPOffset(-2)
          private var lastOffsetCommitted : SSPOffset = new SSPOffset(-1)

          private var incomingEventCounter = 0;
          private var stopped:Boolean = false

          private var acsURLConn:HttpURLConnection = null
          private var worker:Thread = null

          private val sspList:ListBuffer[StreamingQueryStatus] = new ListBuffer[StreamingQueryStatus]()
          private var sspQueue:BlockingQueue[StreamingQueryStatus] = null


          private def initialize(): Unit = synchronized {
            log.warn(s"Inside initialize ....")
            sspQueue = new ArrayBlockingQueue(queueSize)

            new Thread("Socket Receiver") { log.warn(s"Inside thread ....") 
              override def run() { 
                log.warn(s"Inside run ....") 
                receive() }

          private def receive(): Unit = {
            log.warn(s"Inside recieve() ....")
            var userInput: String = null

              acsURLConn = new AccessACS(httpURL).getACSConnection(); 

              // Until stopped or connection broken continue reading
              val reader = new BufferedReader(
                new InputStreamReader(acsURLConn.getInputStream(), java.nio.charset.StandardCharsets.UTF_8))

              userInput = reader.readLine()

              while(!stopped) {
                  // poll tweets from queue
                  val tweet:StreamingQueryStatus = sspQueue.poll(100, TimeUnit.MILLISECONDS)

                  if (tweet != null) {  
                    currentOffset = currentOffset + 1  
                    incomingEventCounter = incomingEventCounter + 1;


          override def planInputPartitions(): java.util.List[InputPartition[org.apache.spark.sql.catalyst.InternalRow]] = {
            synchronized {
              log.warn(s"Inside planInputPartitions ....")


              val startOrdinal = startOffset.offset.toInt + 1
              val endOrdinal = endOffset.offset.toInt + 1

              internalLog(s"createDataReaderFactories: sOrd: $startOrdinal, eOrd: $endOrdinal, " +
                s"lastOffsetCommitted: $lastOffsetCommitted")

              val newBlocks = synchronized {
                val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
                val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
                assert(sliceStart <= sliceEnd, s"sliceStart: $sliceStart sliceEnd: $sliceEnd")
                sspList.slice(sliceStart, sliceEnd)

              newBlocks.grouped(numPartitions).map { block =>
                                                        new SSPStreamBatchTask(block).asInstanceOf[InputPartition[org.apache.spark.sql.catalyst.InternalRow]]

          override def setOffsetRange(start: Optional[Offset],
                                      end: Optional[Offset]): Unit = {

            if (start.isPresent && start.get().asInstanceOf[SSPOffset].offset != currentOffset.offset) {
              internalLog(s"setOffsetRange: start: $start, end: $end currentOffset: $currentOffset")

            this.startOffset = start.orElse(NO_DATA_OFFSET).asInstanceOf[SSPOffset]
            this.endOffset = end.orElse(currentOffset).asInstanceOf[SSPOffset]

          override def getStartOffset(): Offset = {
            internalLog("getStartOffset was called")
            if (startOffset.offset == -1) {
              throw new IllegalStateException("startOffset is -1")

          override def getEndOffset(): Offset = {
            if (endOffset.offset == -1) {
            } else {

              if (lastReturnedOffset.offset < endOffset.offset) {
                internalLog(s"** getEndOffset => $endOffset)")
                lastReturnedOffset = endOffset



          override def commit(end: Offset): Unit = {
            internalLog(s"** commit($end) lastOffsetCommitted: $lastOffsetCommitted")

            val newOffset = SSPOffset.convert(end).getOrElse(
              sys.error(s"SSPStreamMicroBatchReader.commit() received an offset ($end) that did not " +
                s"originate with an instance of this class")

            val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt

            if (offsetDiff < 0) {
              sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")

            lastOffsetCommitted = newOffset

          override def stop(): Unit = {
            log.warn(s"There is a total of $incomingEventCounter events that came in")
            stopped = true
            if (acsURLConn != null) {
              try {
              } catch {
                case e: IOException =>

          override def deserializeOffset(json: String): Offset = {
          override def readSchema(): StructType = {

          private def internalLog(msg:String): Unit = {
            debugLevel match {
              case "warn" => log.warn(msg)
              case "info" => log.info(msg)
              case "debug" => log.debug(msg)
              case _ =>

        object SSPStreamingSource {

          val HTTP_URL = "httpURL"
          val DEBUG_LEVEL = "debugLevel"
          val NUM_PARTITIONS = "numPartitions"
          val QUEUE_SIZE = "queueSize"

           val SCHEMA = StructType(Seq(
                            StructField("status", StringType),
                            StructField("data", StructType(Seq(
                                StructField("resultType", StringType),
                                StructField("result", ArrayType(StructType(Array(
                                                         StructField("application", StringType),
                                                                         StructField("component", StringType),                                                                                                                                                          
                                                                       StructField("instance", StringType)))), 
                                    StructField("value", ArrayType(StringType))

        class SSPStreamBatchTask(sspList:ListBuffer[StreamingQueryStatus]) extends InputPartition[Row] {
          override def createPartitionReader(): InputPartitionReader[Row] = new SSPStreamBatchReader(sspList)

        class SSPStreamBatchReader(sspList:ListBuffer[StreamingQueryStatus]) extends InputPartitionReader[Row] {
          private var currentIdx = -1

          override def next(): Boolean = {
            // Return true as long as the new index is in the seq.
            currentIdx += 1
            currentIdx < sspList.size

          override def get(): Row = {
            val tweet = sspList(currentIdx)

          override def close(): Unit = {}

Далее этот класс используется следующим образом

    val a = sparkSession.readStream
            .option(SSPStreamingSource.HTTP_URL, httpMemTotal)

            .option("checkpointLocation", "/home/localCheckpoint1") //local

Здесь есть ошибка.Я еще взломать это: (

    18/11/29 13:33:28 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
    18/11/29 13:33:28 WARN SSPStreamingSource: Inside createMicroBatchReader() ....
    18/11/29 13:33:28 WARN SSPStreamMicroBatchReader: Inside initialize ....
    18/11/29 13:33:28 WARN SSPStreamMicroBatchReader: Inside thread ....
    18/11/29 13:33:28 WARN SSPStreamMicroBatchReader: There is a total of 0 events that came in
    18/11/29 13:33:28 WARN SSPStreamMicroBatchReader: Inside run ....
    18/11/29 13:33:28 WARN SSPStreamMicroBatchReader: Inside recieve() ....
     |-- status: string (nullable = true)
     |-- data: struct (nullable = true)
     |    |-- resultType: string (nullable = true)
     |    |-- result: array (nullable = true)
     |    |    |-- element: struct (containsNull = true)
     |    |    |    |-- metric: struct (nullable = true)
     |    |    |    |    |-- application: string (nullable = true)
     |    |    |    |    |-- component: string (nullable = true)
     |    |    |    |    |-- instance: string (nullable = true)
     |    |    |    |-- value: array (nullable = true)
     |    |    |    |    |-- element: string (containsNull = true)

    18/11/29 13:33:30 INFO MicroBatchExecution: Starting [id = f15252df-96d8-45b4-a6db-83fd4c7aed71, runId = 65a6dc28-5eb4-468a-80c3-f547504689d7]. Use file:///home/localCheckpoint1 to store the query checkpoint.
    18/11/29 13:33:30 WARN SSPStreamingSource: Inside createMicroBatchReader() ....
    18/11/29 13:33:30 WARN SSPStreamMicroBatchReader: Inside initialize ....
    18/11/29 13:33:30 ERROR StreamingContext: Error starting the context, marking it as stopped
    java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
      at scala.Predef$.require(Predef.scala:224)
      at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:168)
      at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
      at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
      at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
      at myproject.spark.predictive_monitoring.predictmyproject$.run(predictmyproject.scala:99)
      at myproject.spark.predictive_monitoring.predictmyproject$.main(predictmyproject.scala:31)
      at myproject.spark.predictive_monitoring.predictmyproject.main(predictmyproject.scala)
    Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
      at scala.Predef$.require(Predef.scala:224)
      at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:168)
      at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
      at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
      at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
      at myproject.spark.predictive_monitoring.predictmyproject$.run(predictmyproject.scala:99)
      at myproject.spark.predictive_monitoring.predictmyproject$.main(predictmyproject.scala:31)
      at myproject.spark.predictive_monitoring.predictmyproject.main(predictmyproject.scala)
    18/11/29 13:33:30 INFO SparkContext: Invoking stop() from shutdown hook
    18/11/29 13:33:30 WARN SSPStreamMicroBatchReader: Inside thread ....
    18/11/29 13:33:30 WARN SSPStreamMicroBatchReader: Inside run ....
    18/11/29 13:33:30 WARN SSPStreamMicroBatchReader: Inside recieve() ....
    18/11/29 13:33:30 INFO MicroBatchExecution: Using MicroBatchReader [myproject.spark.predictive_monitoring.SSPStreamMicroBatchReader@74cc1ddc] from DataSourceV2 named 'myproject.spark.predictive_monitoring.SSPStreamingSource' [myproject.spark.predictive_monitoring.SSPStreamingSource@7e503c3]
    18/11/29 13:33:30 INFO SparkUI: Stopped Spark web UI at
    18/11/29 13:33:30 ERROR MicroBatchExecution: Query [id = f15252df-96d8-45b4-a6db-83fd4c7aed71, runId = 65a6dc28-5eb4-468a-80c3-f547504689d7] terminated with error
    java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
    This stopped SparkContext was created at:


    The currently active SparkContext was created at:


      at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
      at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:91)
      at org.apache.spark.sql.SparkSession.cloneSession(SparkSession.scala:256)
      at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:268)
      at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
    18/11/29 13:33:30 WARN SSPStreamMicroBatchReader: There is a total of 0 events that came in
    18/11/29 13:33:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    18/11/29 13:33:30 INFO MemoryStore: MemoryStore cleared
    18/11/29 13:33:30 INFO BlockManager: BlockManager stopped
    18/11/29 13:33:30 INFO BlockManagerMaster: BlockManagerMaster stopped
    18/11/29 13:33:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    18/11/29 13:33:31 INFO SparkContext: Successfully stopped SparkContext
