Я пытался реализовать pyspark socketstreaming. Но я столкнулся с ошибкой. Когда я отправляю текст, он работает, но когда я отправляю на место, он не работает. Тип текста - строка, тип места - также строка. И я знаю, что сокет принимает только байтовый объект. Поэтому я попытался закодировать строковый объект. Я перепробовал все, но не смог прийти к выводу.
def send_tweets_to_spark(http_resp, tcp_connection):
for line in http_resp.iter_lines():
try:
print("-------------------------------------------")
print(line)
print(type(line))
full_tweet = json.loads(line)
print(full_tweet)
print(type(full_tweet))
text=full_tweet['text']
print(text)
print(type(text))
place=full_tweet['place']['full_name']
print(place)
print(type(place))
print ("------------------------------------------")
tcp_connection.send(place.encode('utf-8'))
#When we send to text it works but when we send to place it's not working.
#tcp_connection.send(text.encode('utf-8'))
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
Что касается Spark, я думаю, что не ошибся.
# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterStreamApp")
# create spark instance with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# creat the Streaming Context from the above spark context with window size 2 seconds
ssc = StreamingContext(sc, 2)
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_TwitterApp")
# read data from port 60127
dataStream = ssc.socketTextStream("localhost",60127)
lines=dataStream.window(20)
lines.pprint()
# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()