В настоящее время я работаю над финальным проектом моего курса Streaming, но у меня много проблем с преобразованием данных из RDD в DataFrame и возможностью записи CSV или TXT.
I Я транслирую твиты с помощью API, а затем использую Spark Streaming, «прослушивая» определенный порт.
Это мой код:
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests
def get_sql_context_instance(spark_context):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
return globals()['sqlContextSingletonInstance']
def my_funk(time, rdd):
print("----------- %s -----------" % str(time))
try:
sql_context = get_sql_context_instance(rdd.context)
row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
hashtags_df = sql_context.createDataFrame(row_rdd)
hashtag_counts_df = sql_context.sql("select * from lines")
hashtag_counts_df.show()
hashtag_counts_df.saveAsTextFile("/data/home/fjvigo/output/file1.txt")
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterStreamApp")
# create spark context with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# create the Streaming Context from the above spark context with interval size 2 seconds
ssc = StreamingContext(sc, 2)
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("my_checkpoint")
# read data from the port
lines = ssc.socketTextStream("localhost", 7890)
lines.pprint()
lines.foreachRDD(my_funk)
# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()