Я работаю над простым примером подсчета слов SparkStreaming для подсчета количества слов в текстовых данных, полученных от сервера данных, прослушивающего сокет TCP.Я хотел бы сохранить содержимое каждого Dstream, которые не являются пустыми, в существующий текстовый файл.В настоящее время я использую Spark Shell.Это мой код
Я пробовал этот код, и он работает, но он перезаписывает текущий файл:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD{ rdd => if (!rdd.isEmpty)
//.% to check if the Dstream is empty or not
{
rdd.saveAsTextFile("/stream_test/testLine.txt")
}
}
ssc.start()
Я ценю вашу ценную помощь