Я пытаюсь провести анализ настроений в реальном времени для твиттера. Я могу отправлять потоковые данные в Twitter потребителю. Сейчас я могу анализировать настроения при потоковой передаче данных из твиттера потребителю. Так что, в принципе, я могу сразу сделать анализ потоковых данных. Но теперь я не знаю, как отправить необработанные данные из твиттера с проанализированным результатом одновременно потребителю.
Я использую kafka-python и pyspark. Я попытался создать фрейм данных проанализированного результата, который я сделал в коде производителя
python: This is how i get the streaming data and do the sentiment
analysis
class StdOutListener(StreamListener):
def __init__(self, producer):
self.producer_obj = producer
def on_data(self, data):
try:
self.producer_obj.send("twitterstreamingdata", data.encode('utf-8'))
global initime
t = int(calctime(initime))
all_data = json.loads(data)
tweet = all_data["text"]
# username=all_data["user"]["screen_name"]
tweet = " ".join(re.findall("[a-zA-Z]+", tweet))
blob = TextBlob(tweet.strip())
global positive
global negative
global compound
global count
count = count + 1
senti = 0
for sen in blob.sentences:
senti = senti + sen.sentiment.polarity
if sen.sentiment.polarity >= 0:
positive = positive + sen.sentiment.polarity
else:
negative = negative + sen.sentiment.polarity
compound = compound + senti
print(count)
print(tweet)
print(senti)
print(t)
print(str(positive) + ' ' + str(negative) + ' ' + str(compound))
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
Поскольку я не знаю, как отправлять проанализированные данные, мне удалось получить только необработанные данные Twitter.