rdd.first () не выдает ошибку, но rdd.collect () делает - PullRequest
0 голосов
/ 19 мая 2018

Я работаю в pyspark и у меня есть следующий код, где я обрабатываю твит и создаю RDD с user_id и текстом.Ниже приведен код

"""
# Construct an RDD of (user_id, text) here.
"""

import json

def safe_parse(raw_json):
    try:
        json_object = json.loads(raw_json)    
        if 'created_at' in json_object:
            return json_object
        else:
            return;
    except ValueError as error:
        return;


def get_usr_txt (line):

    tmp = safe_parse (line)

    return ((tmp.get('user').get('id_str'),tmp.get('text')));

usr_txt = text_file.map(lambda line: get_usr_txt(line))
print (usr_txt.take(5))

и вывод выглядит нормально (как показано ниже)

[('470520068', "I'm voting 4 #BernieSanders bc he doesn't ride a CAPITALIST PIG adorned w/ #GoldmanSachs $. SYSTEM RIGGED CLASS WAR "), ('2176120173', "RT @TrumpNewMedia: .@realDonaldTrump #America get out & #VoteTrump if you don't #VoteTrump NOTHING will change it's that simple!\n#Trump htt…"), ('145087572', 'RT @Libertea2012: RT TODAY: #Colorado’s leading progressive voices to endorse @BernieSanders! #Denver 11AM - 1PM in MST CO State Capitol…'), ('23047147', '[VID] Liberal Tears Pour After Bernie Supporter Had To Deal With Trump Fans '), ('526506000', 'RT @justinamash: .@tedcruz is the only remaining candidate I trust to take on what he correctly calls the Washington Cartel. ')]

Однако, как только я это сделаю

print (usr_txt.count())

, я получаюошибка как ниже

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-60-9dacaf2d41b5> in <module>()    
  8 usr_txt = text_file.map(lambda line: get_usr_txt(line))
  9 #print (usr_txt.take(5))
---> 10 print (usr_txt.count())    
 11 

/usr/local/spark/python/pyspark/rdd.py in count(self)
   1054         3
   1055         """
-> 1056         return self.mapPartitions(lambda i: [sum(1 for _ in     i)]).sum()
   1057 
   1058     def stats(self):

Чего мне не хватает?Правильно ли создан СДР?или есть что-то еще?как мне это исправить?

1 Ответ

0 голосов
/ 19 мая 2018

Вы вернули метод None из safe_parse, когда в разобранной строке json отсутствует элемент созданный_каталог или когда произошла ошибка при разборе.Это создало ошибку при получении элементов из проанализированного jsons в (tmp.get('user').get('id_str'),tmp.get('text')).Это привело к возникновению ошибки

Решение состоит в том, чтобы проверить None в get_usr_txt метод

def get_usr_txt (line):
    tmp = safe_parse(line)
    if(tmp != None):
        return ((tmp.get('user').get('id_str'),tmp.get('text')));

Теперь вопрос заключается в том, почему print (usr_txt.take(5)) показал результат и print (usr_txt.count())вызвало ошибку

Это потому, что usr_txt.take(5) рассматривал только первые пять первых, а не остальные и не имел дело с типом данных None.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...