Я читаю поток Твиттера из моей темы Kafka при преобразовании его в JSON в коде Pyspark, данные пропадают.
Предоставление кода ниже
Код читает поток Twitter из темы Кафки и конвертирует его в формат JSON.
При доступе к твиту ['user'] происходит ошибка ключа (индексы должны быть целыми числами), когда твит [0] получает первый символ сообщения.
from __future__ import print_function
import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
sys.exit(-1)
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers,topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: json.loads(x[1]))
status=lines.map(lambda tweets: tweets['user']['screen_name'])
#status.pprint()
status.pprint()
#status.map(lambda tweet: tweet['created_at']).pprint()
#counts = lines.flatMap(lambda line: line.split(" ")) \
# .filter(lambda word: word.lower().startswith('#')) \
# .map(lambda word: (word.lower(), 1)) \
# .reduceByKey(lambda a, b: a+b)
#counts.pprint()
ssc.start()
ssc.awaitTermination()
Получение этого вывода после преобразования сообщения Кафки в JSON
{u'quote_count ': 0, u'contributors': нет, utruncated ': False, u'text': u'RT @hotteaclout: @TeenChoiceFOX мой голос #TeenChoice за #ChoiceActionMovieActor является Крисом Эвансом, u'is_quote_status ': False, u'in_reply_to_status_id': Нет, u areply_count ': 0, u'id': 1149313606304976896, .....}
...
Фактическое сообщение:
{"creat_at": "Чт 11 июля 13:44:55 +0000 2019", "id": 1149313623363338241, "id_str": "1149313623363338241", "text": "RT @alisonpool_: Легит думал, что это Майк Вазовский на секунду LMFAO https://t.co/DMzMtOfW2I","source":"\u003ca href = \ "http://twitter.com/download/iphone\" ....}