Я хочу работать с примерами кодов из github (https://github.com/kaantas/spark-twitter-sentiment-analysis). Я выполняю следующие шаги:
- Запущен zkserver
- Запущена версия kafka 2.5.0 (также я использую apache spark 3.0.0 и jdk 8)
- Запущен tweeetlistener.py (твиты начинают транслироваться, я вижу окно cmd твита)
- Я открываю twitter_topic_avg_sentiment_val.py с помощью Spyder, и он просто показывает нижний текст
Примечание: i не знаю о банках, если я буду использовать внешнюю банку, пожалуйста, объясните, как? БОЛЬШОЕ СПАСИБО ...
Traceback (most recent call last):
File "C:\Users\merha\Desktop\spark-twitter-sentiment-analysis-master\twitter_topic_avg_sentiment_val.py", line 40, in <module>
query.awaitTermination()
File "C:\Anaconda3\lib\site-packages\pyspark\sql\streaming.py", line 103, in awaitTermination
return self._jsq.awaitTermination()
File "C:\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Anaconda3\lib\site-packages\pyspark\sql\utils.py", line 137, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
StreamingQueryException: org/apache/spark/kafka010/KafkaConfigUpdater
=== Streaming Query ===
Identifier: [id = f5dd9cb5-fcea-42ec-a20e-93a2ad233e1f, runId = 6cffdd89-3792-4500-a508-e4abc76425fb]
Current Committed Offsets: {}
Current Available Offsets: {}
Current State: INITIALIZING
Thread State: RUNNABLE
------------------ <<<<<<<<<<<<<<<<<< >>> ------------------------
from tweepy import Stream
from tweepy.streaming import StreamListener
import json
import twitter_config
import pykafka
from afinn import Afinn
import sys
from sys import exit
class TweetListener(StreamListener):
def __init__(self):
self.client = pykafka.KafkaClient("localhost:9092")
self.producer = self.client.topics[bytes('twitter3','ascii')].get_producer()
def on_data(self, data):
try:
json_data = json.loads(data)
send_data = '{}'
json_send_data = json.loads(send_data)
json_send_data['text'] = json_data['text']
json_send_data['senti_val']=afinn.score(json_data['text'])
print(json_send_data['text'], " >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ", json_send_data['senti_val'])
self.producer.produce(bytes(json.dumps(json_send_data),'ascii'))
return True
except KeyError:
return True
def on_error(self, status):
print(status)
return True
consumer_key = "xxxxxxxxxx"
consumer_secret = "xxxxxxxxxxx"
access_token = "xxxxxxxxxxxx"
access_secret = "xxxxxxxxxx"
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
# create AFINN object for sentiment analysis
afinn = Afinn()
twitter_stream = Stream(auth, TweetListener())
twitter_stream.filter(languages=['en'], track=["big data"])
-------------- -------- << >>>>> ---------------
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import json
import sys
from pyspark.sql.types import *
def fun(avg_senti_val):
try:
if avg_senti_val < 0: return 'NEGATIVE'
elif avg_senti_val == 0: return 'NEUTRAL'
else: return 'POSITIVE'
except TypeError:
return 'NEUTRAL'
if __name__ == "__main__":
schema = StructType([
StructField("text", StringType(), True),
StructField("senti_val", DoubleType(), True)
])
spark = SparkSession.builder.appName("TwitterSentimentAnalysis") .getOrCreate()
kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "twitter3").option("startingOffsets", "earliest").load()
kafka_df_string = kafka_df.selectExpr("CAST(value AS STRING)")
tweets_table = kafka_df_string.select(from_json(col("value"), schema).alias("data")).select("data.*")
sum_val_table = tweets_table.select(avg('senti_val').alias('avg_senti_val'))
# udf = USER DEFINED FUNCTION
udf_avg_to_status = udf(fun, StringType())
# avarage of senti_val column to status column
new_df = sum_val_table.withColumn("status", udf_avg_to_status("avg_senti_val"))
query = kafka_df_string.writeStream.format("console").option("truncate","false").start()
query.awaitTermination()```