Я пытаюсь транслировать твиты, используя pyspark и библиотеку tweepy, чтобы получить десятку лучших твитов, основанных на количестве ретвитов и лайков.
Первый шаг - потоковая передача твитов с использованием твипи.и это код:
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json
consumer_key = 'consumer_key'
consumer_secret = 'secret_key'
access_token = 'token_key'
access_secret = 'access_secret_key'
class TweetsListener(StreamListener):
def __init__(self, csocket):
self.client_socket = csocket
def on_data(self, data):
try:
msg = json.loads( data )
print( msg['text'].encode('utf-8') )
self.client_socket.send(
msg['text'].encode('utf-8') )
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
def on_error(self, status):
print(status)
return True
def sendData(c_socket):
auth = OAuthHandler(consumer_key,
consumer_secret)
auth.set_access_token(access_token,
access_secret)
twitter_stream = Stream(auth,
TweetsListener(c_socket))
twitter_stream.filter(track=
['iphone'],languages=["en"])
if __name__ == "__main__":
s = socket.socket()
host = "192.168.0.12"
port = 5555
s.bind((host, port))
print("Listening on port: %s" % str(port))
s.listen(5)
c, addr = s.accept()
print( "Received request from: " + str( addr ) )
sendData( c )
Во-вторых, это мой код pyspark, который я запускаю в блокноте jupyter.Сначала код потоковой передачи:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
from pyspark.sql import HiveContext
sc
ssc = StreamingContext(sc, 10 )
sqlContext = SQLContext(sc)
socket_stream = ssc.socketTextStream("192.168.0.12", 5555)
lines = socket_stream.window( 20 )
from collections import namedtuple
fields = ("tag", "count" )
Tweet = namedtuple( 'Tweet', fields)
(lines.flatMap(lambda text: text.split(" ")).filter(lambda word: word.lower().startswith("#"))
.map( lambda word: ( word.lower(), 1))
.reduceByKey( lambda a, b: a + b)
.map(lambda rec: Tweet(rec[0], rec[1]))
.foreachRDD(lambda rdd: rdd.toDF().sort(desc("count")).limit(10).registerTempTable("tweets") ))
ssc.start()
А затем несколько кодов тестирования и печати:
import time
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
count = 0
while count < 10:
time.sleep( 3 )
top_10_tweets = sqlContext.sql( 'Select tag, count from tweets' )
top_10_df = top_10_tweets.toPandas()
display.clear_output(wait=True) #Clears the output, if a plot exists.
sns.plt.figure( figsize = ( 10, 8 ) )
sns.barplot( x="count", y="tag", data=top_10_df)
sns.plt.show()
count = count + 1
, но я получил эту ошибку, когда достигну последней ячейки, которая начинается с count = 0 доконец:
---------------------------
Py4JJavaError
Traceback (most recent call last)
/usr/local/Cellar/apache-/usr/local/Cellar/apache-
spark/2.4.0/libexec/python/pyspark/sql/utils.py
in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except
py4j.protocol.Py4JJavaError as e:
/usr/local/Cellar/apache-
spark/2.4.0/libexec/python/lib/py4j-0.10.7-
src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client,
target_id, name)
327 "An error occurred
while calling {0}{1}{2}.\n".
--> 328 format(target_id,
".", name), value)
329 else:
AnalysisException: 'Table or view not
found: tweets; line 1 pos 23'
есть идеи, как это решить?