Я работал над простым примером подсчета слов SparkStreaming, чтобы подсчитать количество слов в текстовых данных, полученных от сервера данных, прослушивающего сокет TCP.
Я хотел бы проверить, является ли пакет из потокового источника пустым или нет, прежде чем сохранять содержимое каждого преобразования в текстовые файлы. В настоящее время я использую Spark Shell. Это мой код
Я попробовал этот код, и он отлично работает, не проверяя, пусто ли пакет:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
lines.saveAsTextFiles("/stream_test/testLine.txt")
val words = lines.flatMap(_.split(" "))
words.saveAsTextFiles("/stream_test/testWords.txt")
val pairs = words.map((_, 1))
pairs.saveAsTextFiles("/stream_test/testPairs.txt")
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.saveAsTextFiles("/stream_test/testWordsCounts.txt")
wordCounts.print()
ssc.start()
Я пытался использовать foreachRDD
, но выдает ошибку error: value saveAsTextFiles is not a member of org.apache.spark.rdd.RDD[String]
Это мой код
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(3))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(rdd => {
if(!rdd.partitions.isEmpty)
{
lines.saveAsTextFiles("/stream_test/testLine.txt")
val words = lines.flatMap(_.split(" "))
words.saveAsTextFiles("/stream_test/testWords.txt")
val pairs = words.map((_, 1))
pairs.saveAsTextFiles("/stream_test/testPairs.txt")
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.saveAsTextFiles("/stream_test/testWordsCounts.txt")
wordCounts.print()
}
})
ssc.start()
I need to to check if the batch from streaming source is empty or not before I save the content text files. I appreciate your help