Spark Streaming / Запись в CSV или TEXT - PullRequest
0 голосов
/ 01 мая 2020

В настоящее время я работаю над финальным проектом моего курса Streaming, но у меня много проблем с преобразованием данных из RDD в DataFrame и возможностью записи CSV или TXT.

I Я транслирую твиты с помощью API, а затем использую Spark Streaming, «прослушивая» определенный порт.

Это мой код:

    from pyspark import SparkConf,SparkContext 
    from pyspark.streaming import StreamingContext 
    from pyspark.sql import Row,SQLContext
    import sys
    import requests


    def get_sql_context_instance(spark_context):
        if ('sqlContextSingletonInstance' not in globals()):
            globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) 
        return globals()['sqlContextSingletonInstance']

    def my_funk(time, rdd):
        print("----------- %s -----------" % str(time))
        try:
            sql_context = get_sql_context_instance(rdd.context)
            row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
            hashtags_df = sql_context.createDataFrame(row_rdd)
            hashtag_counts_df = sql_context.sql("select * from lines") 
            hashtag_counts_df.show()
            hashtag_counts_df.saveAsTextFile("/data/home/fjvigo/output/file1.txt")
        except:
            e = sys.exc_info()[0] 
            print("Error: %s" % e)


    # create spark configuration
    conf = SparkConf() 
    conf.setAppName("TwitterStreamApp")

    # create spark context with the above configuration 
    sc = SparkContext(conf=conf) 
    sc.setLogLevel("ERROR")
    # create the Streaming Context from the above spark context with interval size 2 seconds 
    ssc = StreamingContext(sc, 2)

    # setting a checkpoint to allow RDD recovery 

    ssc.checkpoint("my_checkpoint")

    # read data from the port 

    lines = ssc.socketTextStream("localhost", 7890)

    lines.pprint()

    lines.foreachRDD(my_funk)

    # start the streaming computation 
    ssc.start()
    # wait for the streaming to finish 
    ssc.awaitTermination()
...