Я пытаюсь создать привет мир в Spark Streaming, используя Scala. Вместо чтения строки из сокета или файла, я хочу прочитать ее из потока, который генерирует строку каждую 1 секунду. Мой контекст SparkStream настроен на использование мини-пакетов по 5 секунд. Итак, я бы ожидал 5 раз строку, которую я генерирую внутри потока. Тем не менее, я считаю только один раз строки, которые я генерировал каждые 5 секунд. Есть ли другой способ реализовать функцию источника в Spark Streaming?
package org.sense.spark.app
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf}
object TestStreamCombineByKey {
def main(args: Array[String]): Unit = {
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("LocalWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
// Create a DStream that will connect to hostname:port, like localhost:9999
// val lines = ssc.socketTextStream("localhost", 9999)
val queue = new scala.collection.mutable.Queue[RDD[String]]
val thread = new Thread("pool data source") {
override def run() {
while (true) {
queue.enqueue(ssc.sparkContext.parallelize(List("to be or not to be , that is the question")))
Thread.sleep(1000)
}
}
}
thread.start()
val lines = ssc.queueStream(queue)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
// val wordCounts = pairs.reduceByKey(_ + _)
val wordCounts = pairs.combineByKey(
(v) => (v, 1), //createCombiner
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //mergeValue
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2), // mergeCombiners
new HashPartitioner(3)
)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
}
output:
-------------------------------------------
Time: 1570784580000 ms
-------------------------------------------
(or,(1,1))
(the,(1,1))
(not,(1,1))
(is,(1,1))
(that,(1,1))
(be,(2,2))
(question,(1,1))
(to,(2,2))
(,,(1,1))
-------------------------------------------
Time: 1570784585000 ms
-------------------------------------------
(or,(1,1))
(the,(1,1))
(not,(1,1))
(is,(1,1))
(that,(1,1))
(be,(2,2))
(question,(1,1))
(to,(2,2))
(,,(1,1))