Как реализовать функцию источника String для потоковой передачи Spark? - PullRequest
0 голосов
/ 11 октября 2019

Я пытаюсь создать привет мир в Spark Streaming, используя Scala. Вместо чтения строки из сокета или файла, я хочу прочитать ее из потока, который генерирует строку каждую 1 секунду. Мой контекст SparkStream настроен на использование мини-пакетов по 5 секунд. Итак, я бы ожидал 5 раз строку, которую я генерирую внутри потока. Тем не менее, я считаю только один раз строки, которые я генерировал каждые 5 секунд. Есть ли другой способ реализовать функцию источника в Spark Streaming?

package org.sense.spark.app

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf}

object TestStreamCombineByKey {
  def main(args: Array[String]): Unit = {

    // Create a local StreamingContext with two working thread and batch interval of 1 second.
    // The master requires 2 cores to prevent from a starvation scenario.
    val conf = new SparkConf().setMaster("local[2]").setAppName("LocalWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))

    // Create a DStream that will connect to hostname:port, like localhost:9999
    // val lines = ssc.socketTextStream("localhost", 9999)
    val queue = new scala.collection.mutable.Queue[RDD[String]]
    val thread = new Thread("pool data source") {
      override def run() {
        while (true) {
          queue.enqueue(ssc.sparkContext.parallelize(List("to be or not to be , that is the question")))
          Thread.sleep(1000)
        }
      }
    }
    thread.start()

    val lines = ssc.queueStream(queue)

    // Split each line into words
    val words = lines.flatMap(_.split(" "))

    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    // val wordCounts = pairs.reduceByKey(_ + _)
    val wordCounts = pairs.combineByKey(
      (v) => (v, 1), //createCombiner
      (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //mergeValue
      (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2), // mergeCombiners
      new HashPartitioner(3)
    )

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
  }
}

output:

-------------------------------------------
Time: 1570784580000 ms
-------------------------------------------
(or,(1,1))
(the,(1,1))
(not,(1,1))
(is,(1,1))
(that,(1,1))
(be,(2,2))
(question,(1,1))
(to,(2,2))
(,,(1,1))
-------------------------------------------
Time: 1570784585000 ms
-------------------------------------------
(or,(1,1))
(the,(1,1))
(not,(1,1))
(is,(1,1))
(that,(1,1))
(be,(2,2))
(question,(1,1))
(to,(2,2))
(,,(1,1))
...