Как проверить, если RDD пуст с помощью потоковой передачи искры? - PullRequest
0 голосов
/ 27 февраля 2019

У меня следующий код pyspark, который я использую для чтения файлов журнала из журнала / каталога, а затем сохраняю результаты в текстовый файл только тогда, когда в нем есть данные ... другими словами, когда СДР не пуст.Но у меня есть проблемы с его реализацией.Я пробовал оба взять (1) и notempty.Так как это dstream rdd, мы не можем применить к нему rdd методы.Пожалуйста, дайте мне знать, если я что-то упустил.

conf = SparkConf().setMaster("local").setAppName("PysparkStreaming")
sc = SparkContext.getOrCreate(conf = conf)

ssc = StreamingContext(sc, 3)   #Streaming will execute in each 3 seconds
lines = ssc.textFileStream('/Users/rocket/Downloads/logs/')  #'logs/ mean directory name
audit = lines.map(lambda x: x.split('|')[3])
result = audit.countByValue()
#result.pprint()
#result.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
# Print the first ten elements of each RDD generated in this DStream to the console
if result.foreachRDD(lambda rdd: rdd.take(1)):
    result.pprint()
    result.saveAsTextFiles("/Users/rocket/Downloads/output","txt")
else:
    result.pprint()
    print("empty")

1 Ответ

0 голосов
/ 27 февраля 2019

Правильная структура будет

import uuid 

def process_batch(rdd):
    if not rdd.isEmpty():
        result.saveAsTextFiles("/Users/rocket/Downloads/output-{}".format(
          str(uuid.uuid4())
        ) ,"txt")


result.foreachRDD(process_batch)

Однако, как вы видите выше, для каждой партии требуется отдельный каталог, поскольку RDD API не имеет режима append.

И альтернативой может быть:

def process_batch(rdd):
    if not rdd.isEmpty():
       lines = rdd.map(str)
       spark.createDataFrame(lines, "string").save.mode("append").format("text").save("/Users/rocket/Downloads/output")
...