Twitter + Apache Kafka + Spark Structured Streaming не работает - PullRequest
0 голосов
/ 08 июля 2020

Я хочу работать с примерами кодов из github (https://github.com/kaantas/spark-twitter-sentiment-analysis). Я выполняю следующие шаги:

  1. Запущен zkserver
  2. Запущена версия kafka 2.5.0 (также я использую apache spark 3.0.0 и jdk 8)
  3. Запущен tweeetlistener.py (твиты начинают транслироваться, я вижу окно cmd твита)
  4. Я открываю twitter_topic_avg_sentiment_val.py с помощью Spyder, и он просто показывает нижний текст

Примечание: i не знаю о банках, если я буду использовать внешнюю банку, пожалуйста, объясните, как? БОЛЬШОЕ СПАСИБО ...

Traceback (most recent call last):

  File "C:\Users\merha\Desktop\spark-twitter-sentiment-analysis-master\twitter_topic_avg_sentiment_val.py", line 40, in <module>
    query.awaitTermination()

  File "C:\Anaconda3\lib\site-packages\pyspark\sql\streaming.py", line 103, in awaitTermination
    return self._jsq.awaitTermination()

  File "C:\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)

  File "C:\Anaconda3\lib\site-packages\pyspark\sql\utils.py", line 137, in deco
    raise_from(converted)

  File "<string>", line 3, in raise_from

StreamingQueryException: org/apache/spark/kafka010/KafkaConfigUpdater
=== Streaming Query ===
Identifier: [id = f5dd9cb5-fcea-42ec-a20e-93a2ad233e1f, runId = 6cffdd89-3792-4500-a508-e4abc76425fb]
Current Committed Offsets: {}
Current Available Offsets: {}

Current State: INITIALIZING
Thread State: RUNNABLE

------------------ <<<<<<<<<<<<<<<<<< >>> ------------------------

from tweepy import Stream
from tweepy.streaming import StreamListener
import json
import twitter_config
import pykafka
from afinn import Afinn
import sys
from sys import exit

class TweetListener(StreamListener):
    def __init__(self):
        self.client = pykafka.KafkaClient("localhost:9092")
        self.producer = self.client.topics[bytes('twitter3','ascii')].get_producer()

    def on_data(self, data):
        try:
            json_data = json.loads(data)

            send_data = '{}'
            json_send_data = json.loads(send_data)          
            json_send_data['text'] = json_data['text']
            json_send_data['senti_val']=afinn.score(json_data['text'])

            print(json_send_data['text'], " >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ", json_send_data['senti_val'])

            self.producer.produce(bytes(json.dumps(json_send_data),'ascii'))
            return True
        except KeyError:
            return True

    def on_error(self, status):
        print(status)
        return True

consumer_key = "xxxxxxxxxx"
consumer_secret = "xxxxxxxxxxx"
access_token = "xxxxxxxxxxxx"
access_secret = "xxxxxxxxxx"

auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)

# create AFINN object for sentiment analysis
afinn = Afinn()

twitter_stream = Stream(auth, TweetListener())
twitter_stream.filter(languages=['en'], track=["big data"])

-------------- -------- << >>>>> ---------------

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import json
import sys
from pyspark.sql.types import *

def fun(avg_senti_val):
    try:
        if avg_senti_val < 0: return 'NEGATIVE'
        elif avg_senti_val == 0: return 'NEUTRAL'
        else: return 'POSITIVE'
    except TypeError:
        return 'NEUTRAL'

if __name__ == "__main__":

    schema = StructType([                                                                                          
        StructField("text", StringType(), True),
        StructField("senti_val", DoubleType(), True)    
    ])
    
    spark = SparkSession.builder.appName("TwitterSentimentAnalysis")  .getOrCreate()
    
    kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "twitter3").option("startingOffsets", "earliest").load()

    kafka_df_string = kafka_df.selectExpr("CAST(value AS STRING)")

    tweets_table = kafka_df_string.select(from_json(col("value"), schema).alias("data")).select("data.*")
    sum_val_table = tweets_table.select(avg('senti_val').alias('avg_senti_val'))
    
    # udf = USER DEFINED FUNCTION
    udf_avg_to_status = udf(fun, StringType())

    # avarage of senti_val column to status column
    new_df = sum_val_table.withColumn("status", udf_avg_to_status("avg_senti_val"))

    query = kafka_df_string.writeStream.format("console").option("truncate","false").start()
    
    query.awaitTermination()```
...