Как добавить содержимое Dstream или RDD в существующий выходной файл - Spark Streaming - PullRequest
1 голос
/ 09 мая 2019

Я работаю над простым примером подсчета слов SparkStreaming для подсчета количества слов в текстовых данных, полученных от сервера данных, прослушивающего сокет TCP.Я хотел бы сохранить содержимое каждого Dstream, которые не являются пустыми, в существующий текстовый файл.В настоящее время я использую Spark Shell.Это мой код

Я пробовал этот код, и он работает, но он перезаписывает текущий файл:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(2))

val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)

lines.foreachRDD{ rdd => if (!rdd.isEmpty) 
//.% to check if the Dstream is empty or not  
{
rdd.saveAsTextFile("/stream_test/testLine.txt")
}
}

ssc.start()

Я ценю вашу ценную помощь

...