Как вы могли прочитать в заголовке, у меня странная проблема с программой, написанной в pyspark.
У меня есть 2 программы, которые общаются через локальный сокет. Целью программы является отправка данных из твиттера. Моя программа работает нормально, когда я отправляю только переменные text
. Но если я хочу отправить другую переменную, я не получаю никаких данных. Даже если я отправляю данные из переменной text
и другой, например id
, она не работает.
Ниже мой код:
Client.py:
import os
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):
def __init__(self, csocket):
self.client_socket = csocket
def on_data(self, data):
try:
tweet = str(json.loads(data)['text']).encode('utf-8') #it works
#tweet = str(json.loads(data)['id']).encode('utf-8') #it doesn't work
#mykeys = ['text','id']
#tweet = str([json.loads(data)[x] for x in mykeys]).encode('utf-8') #it doesn't work
print(tweet)
self.client_socket.sendall(tweet)
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
def sendData(c_socket):
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
twitter_stream = Stream(auth, MyStreamListener(c_socket))
twitter_stream.filter(track=['#fakenews'])
if __name__ == "__main__":
s = socket.socket() # Create a socket object
host = "localhost" # Get local machine name
port = 5555 # Reserve a port for your service.
s.bind((host, port)) # Bind to the port
print("Listening on port: %s" % str(port))
s.listen(1) # Now wait for client connection.
c, addr = s.accept() # Establish connection with client.
print( "Received request from: " + str( addr ) )
sendData(c)
Server.py:
#Need to launch Spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, desc, decode
from pyspark.sql.types import *
import json
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
IP = "localhost"
Port = 5555
lines = spark.readStream.format("socket").option("host", IP).option("port", Port).load()
query = lines.writeStream.outputMode("append").format("memory").queryName("tmpTable")
query.start()
Код для просмотра временной таблицы:
spark.sql("select * from tmpTable").show(n=50)
Как я могу это исправить?