PYSPARK: Почему я получаю сообщение об ошибке при чтении с брокера kafka через pyspark? - PullRequest
0 голосов
/ 11 июля 2019

Я читаю поток Твиттера из моей темы Kafka при преобразовании его в JSON в коде Pyspark, данные пропадают.

Предоставление кода ниже

Код читает поток Twitter из темы Кафки и конвертирует его в формат JSON. При доступе к твиту ['user'] происходит ошибка ключа (индексы должны быть целыми числами), когда твит [0] получает первый символ сообщения.

from __future__ import print_function

import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
        sys.exit(-1)

    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers,topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: json.loads(x[1]))
    status=lines.map(lambda tweets: tweets['user']['screen_name'])
    #status.pprint()
    status.pprint()
    #status.map(lambda tweet: tweet['created_at']).pprint()
    #counts = lines.flatMap(lambda line: line.split(" ")) \
    #    .filter(lambda word: word.lower().startswith('#')) \
    #    .map(lambda word: (word.lower(), 1)) \
    #    .reduceByKey(lambda a, b: a+b)
    #counts.pprint()

    ssc.start()
    ssc.awaitTermination()

Получение этого вывода после преобразования сообщения Кафки в JSON

{u'quote_count ': 0, u'contributors': нет, utruncated ': False, u'text': u'RT @hotteaclout: @TeenChoiceFOX мой голос #TeenChoice за #ChoiceActionMovieActor является Крисом Эвансом, u'is_quote_status ': False, u'in_reply_to_status_id': Нет, u areply_count ': 0, u'id': 1149313606304976896, .....} ...

Фактическое сообщение:

{"creat_at": "Чт 11 июля 13:44:55 +0000 2019", "id": 1149313623363338241, "id_str": "1149313623363338241", "text": "RT @alisonpool_: Легит думал, что это Майк Вазовский на секунду LMFAO https://t.co/DMzMtOfW2I","source":"\u003ca href = \ "http://twitter.com/download/iphone\" ....}

1 Ответ

0 голосов
/ 11 июля 2019

Хорошо, я решил, это была проблема с кодировкой.Просто

json.loads(tweets.encode('utf-8'))

не будет работать, нам нужно указать кодировку файла, чтобы все вызываемые им файлы применяли одну и ту же кодировку.

import sys 
reload(sys)
sys.setdefaultencoding('utf-8')

Добавьте в нее код выше.

...