Почему потоковая передача искрового сокета не принимает вложенные значения json? - PullRequest
0 голосов
/ 26 февраля 2020

Я пытался реализовать pyspark socketstreaming. Но я столкнулся с ошибкой. Когда я отправляю текст, он работает, но когда я отправляю на место, он не работает. Тип текста - строка, тип места - также строка. И я знаю, что сокет принимает только байтовый объект. Поэтому я попытался закодировать строковый объект. Я перепробовал все, но не смог прийти к выводу.

 def send_tweets_to_spark(http_resp, tcp_connection):
        for line in http_resp.iter_lines():
            try:
                print("-------------------------------------------")
                print(line)
                print(type(line))
                full_tweet = json.loads(line)
                print(full_tweet)
                print(type(full_tweet))
                text=full_tweet['text']
                print(text)
                print(type(text))
                place=full_tweet['place']['full_name']
                print(place)
                print(type(place))
                print ("------------------------------------------")
                tcp_connection.send(place.encode('utf-8'))
                #When we send to text it works but when we send to place it's not working.
                #tcp_connection.send(text.encode('utf-8'))
            except:
                e = sys.exc_info()[0]
                print("Error: %s" % e)

Что касается Spark, я думаю, что не ошибся.

 # create spark configuration
    conf = SparkConf()
    conf.setAppName("TwitterStreamApp")
    # create spark instance with the above configuration
    sc = SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    # creat the Streaming Context from the above spark context with window size 2 seconds
    ssc = StreamingContext(sc, 2)
    # setting a checkpoint to allow RDD recovery
    ssc.checkpoint("checkpoint_TwitterApp")
    # read data from port 60127
    dataStream = ssc.socketTextStream("localhost",60127)
    lines=dataStream.window(20)
    lines.pprint()
    # start the streaming computation
    ssc.start()
    # wait for the streaming to finish
    ssc.awaitTermination()
...