Почему я могу отправлять только переменную «текст» через сокет из Tweepy Listener со структурированной потоковой передачей из Spark? - PullRequest
0 голосов
/ 14 ноября 2018

Как вы могли прочитать в заголовке, у меня странная проблема с программой, написанной в pyspark.

У меня есть 2 программы, которые общаются через локальный сокет. Целью программы является отправка данных из твиттера. Моя программа работает нормально, когда я отправляю только переменные text. Но если я хочу отправить другую переменную, я не получаю никаких данных. Даже если я отправляю данные из переменной text и другой, например id, она не работает.

Ниже мой код:

Client.py:

import os
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json

#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):

def __init__(self, csocket):
    self.client_socket = csocket

def on_data(self, data):
    try:
        tweet = str(json.loads(data)['text']).encode('utf-8')   #it works
        #tweet = str(json.loads(data)['id']).encode('utf-8')   #it doesn't work
        #mykeys = ['text','id']
        #tweet = str([json.loads(data)[x] for x in mykeys]).encode('utf-8')  #it doesn't work
        print(tweet)
        self.client_socket.sendall(tweet)
        return True
    except BaseException as e:
        print("Error on_data: %s" % str(e))
    return True

def sendData(c_socket):
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_secret)

    twitter_stream = Stream(auth, MyStreamListener(c_socket))
    twitter_stream.filter(track=['#fakenews'])

if __name__ == "__main__":
    s = socket.socket()     # Create a socket object
    host = "localhost"      # Get local machine name
    port = 5555             # Reserve a port for your service.
    s.bind((host, port))    # Bind to the port

    print("Listening on port: %s" % str(port))

    s.listen(1)                 # Now wait for client connection.
    c, addr = s.accept()        # Establish connection with client.

    print( "Received request from: " + str( addr ) )

    sendData(c)

Server.py:

#Need to launch Spark
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, desc, decode
from pyspark.sql.types import *
import json

spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()

IP = "localhost"
Port = 5555

lines = spark.readStream.format("socket").option("host", IP).option("port", Port).load()

query = lines.writeStream.outputMode("append").format("memory").queryName("tmpTable")

query.start()

Код для просмотра временной таблицы:

spark.sql("select * from tmpTable").show(n=50)

Как я могу это исправить?

...