Как сохранить потоковую искру на локальном ПК и HDF? - PullRequest
0 голосов
/ 22 марта 2019

пытался получить эти данные в потоковом режиме и не смог сохранить эти данные в виде кортежей на локальном диске или в формате hdf.из pyspark импорт SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    def process(RDD):
        #RDD.pprint()
        kvs2=RDD.map()
        kvs2.saveAsTextFiles('path')

    #kvs.foreachRDD(lambda x: process(x))
    #kvs1=kvs.map(lambda x: x)
    kvs.pprint()

    kvs.saveAsTextFiles('path','txt')

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

1 Ответ

1 голос
/ 23 марта 2019

В этой строке:

 kvs.saveAsTextFiles('path','txt')

Вы сохраняете необработанный поток, а не поток с кортежами.Хранить по счетам вместо:

 counts.saveAsTextFiles('path','txt')

Интересно, файлы, сохраненные на рабочих узлах в каталоге, указанном в 'path'.

Сохранение в HDFS неподдерживается PySpark API, как и в последней версии, другие языки имеют saveAsHadoopFiles .Ссылка на документ .

...