Как проанализировать несколько объектов json происходит из потока метода tweepy on_data в postgresql в Python? - PullRequest
0 голосов
/ 01 мая 2019

Я пытаюсь проанализировать данные из нескольких объектов JSON, возвращенных методом tweepy on_data (каждая строка потока данных содержит один объект JSON, каждый поток данных содержит от одной до нескольких строк), и отправить его в PostgreSQL DB, но он всегда не может вставить запись в таблицу с сообщением TypeError ('индексы списка должны быть целыми числами, а не str).

Я не уверен, но я думаю, что проблема находится в коде, где я пытаюсь data.splitlines() и читать / обрабатывать каждую строку в PostgreSQL. Однако, когда я пытался распечатать данные на консоли, он всегда возвращает несколько строк данных и не может обработать их одну за другой.

Я настраиваю скрипт Python для сбора данных потокового твиттера с помощью tweepy и пытаюсь сохранить данные для Apache Spark и PostgreSQL DB, чтобы я мог загрузить их в Grafana.

Я также попробовал несколько решений из здесь , здесь и здесь . Однако, похоже, что проблема все еще существует.

def on_data(self, data):
        (width, height) = getTerminalSize()
        try:
            # here data comes in a multiple json object each in a line; but when i'm trying to read it line by line using splitlines() and for loop for each line it still return multiple lines so that i can't isert into the postgresql table
            lines = data.splitlines()
            for line in lines:
                d = json.loads(line)                  
                # processing the data
                if 'extended_tweet' in d and 'RT @' not in d['extended_tweet']['full_text'].encode('utf-8'):
                    teks_twit = str(d['extended_tweet']['full_text'].encode('utf-8'))
                    print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
                    print("GMT: "+console_colors.WHITE+time.strftime("%a, %d %b %Y %I:%M:%S GMT"+console_colors.ENDC, time.gmtime()))
                    print("Local: "+console_colors.WHITE+strftime("%a, %d %b %Y %I:%M:%S %p %Z (GMT+7)\r"+console_colors.ENDC))
                    # send data to socket for processing in spark
                    self.client_socket.send(data)
                    # insert record to table
                    try:
                        conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
                        cursor = conn.cursor()
                        # insert ops
                        item = {'SCREENNAME': str(d['user']['screen_name'].encode('utf-8')), 'TWEET': teks_twit, 'WORDS': teks_twit.split(), 'HASHTAGS': str(d['entities']['hashtags']['text']), 'WORDCOUNTS': len([teks_twit.split()])}
                        sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, WORDS, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
                        cursor.execute(sqlquery, (item['SCREENNAME'],item['TWEET'],item['WORDS'],item['HASHTAGS'],item['WORDCOUNTS']))
                        conn.commit()
                        count = cursor.rowcount
                        print (count, " Record inserted successfully into table")
                    except (Exception, psycopg2.Error) as error:
                        if(conn):
                            print("Failed to insert record into table", error)
                    finally:
                        #closing database connection.
                        if(conn):
                            cursor.close()
                            conn.close()
                            print(" [*] PostgreSQL connection is closed")
                    print console_colors.GREY+ "_" * width + console_colors.ENDC

                elif 'extended_tweet' not in d and 'RT @' not in d['text'].encode('utf-8'):
                    teks_twit = str(d['text'].encode('utf-8'))
                    print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
                    print("GMT: "+console_colors.WHITE+time.strftime("%a, %d %b %Y %I:%M:%S GMT"+console_colors.ENDC, time.gmtime()))
                    print("Local: "+console_colors.WHITE+strftime("%a, %d %b %Y %I:%M:%S %p %Z (GMT+7)\r"+console_colors.ENDC))
                    # send data to socket for processing in spark
                    self.client_socket.send(data)
                    # insert record to table
                    try:
                        conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
                        cursor = conn.cursor()
                        # insert ops
                        item = {'SCREENNAME': str(d['user']['screen_name'].encode('utf-8')), 'TWEET': teks_twit, 'WORDS': teks_twit.split(), 'HASHTAGS': str(d['entities']['hashtags']['text']), 'WORDCOUNTS': len([teks_twit.split()])}
                        sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, WORDS, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
                        cursor.execute(sqlquery, (item['SCREENNAME'],item['TWEET'],item['WORDS'],item['HASHTAGS'],item['WORDCOUNTS']))
                        conn.commit()
                        count = cursor.rowcount
                        print (count, " Record inserted successfully into table")
                    except (Exception, psycopg2.Error) as error :
                        if(conn):
                            print("Failed to insert record into table", error)
                    finally:
                        #closing database connection.
                        if(conn):
                            cursor.close()
                            conn.close()
                            print(" [*] PostgreSQL connection is closed")
                    print console_colors.GREY+ "_" * width + console_colors.ENDC

                else:
                    pass

        except BaseException as e:
            print("Error on_data: %s" % str(e))

Пример ожидаемого результата в каждом цикле цикла for:

d['user']['screen_name']
fooUser

только один результат за каждый ход.

Фактический результат:

d['user']['screen_name']
fooUser
userNumber2
anotherUser

более одного результата за каждый оборот цикла.

Обновление:

Я обновляю код, и кажется, что результат d['user']['screen_name'], а также другие ключи успешно возвращают один элемент, прямо перед его вставкой в ​​таблицу PostgreSQL.

Однако , сообщение TypeError ('индексы списка должны быть целыми числами, а не str) все еще там. Все еще пробую другой вариант.

Это изменения:

...
try:
            # split each line of data
            for item in data.splitlines():
                # loads the data into json object
                d = json.loads(item)  
                # processing the data
                if d["user"]["lang"] == "id":
                    if 'extended_tweet' in d and 'RT @' not in d['extended_tweet']['full_text'].encode('utf-8'):
                        teks_twit = str(d['extended_tweet']['full_text'].encode('utf-8'))
                        print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
...

и

...
try:
                            conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
                            cursor = conn.cursor()
                            # insert ops
                            # check if the data is already separated
                            print d['user']['screen_name']
                            # insert data into table
                            sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
                            cursor.execute(sqlquery, (str(d['user']['screen_name'].encode('utf-8')), str(d['extended_tweet']['full_text'].encode('utf-8')), str(d['entities']['hashtags']['text']), len(str(d['extended_tweet']['full_text'].encode('utf-8')).split())))

                            conn.commit()
                            count = cursor.rowcount
                            print (count, " Record inserted successfully into table")
                        except (Exception, psycopg2.Error) as error:
                            if(conn):
                                print("Failed to insert record into table", error)
                        finally:
                            #closing database connection.
                            if(conn):
                                cursor.close()
                                conn.close()
                                print(" [*] PostgreSQL connection is closed")
...

1 Ответ

0 голосов
/ 03 июня 2019

Наконец, после прочтения многих источников, проб и ошибок и удаления некоторого поля в таблице со списком в нем, я попытался просто изменить код так:

# split each line of data
     objects = data.splitlines()
     for line in objects:
        # loads the data into json object
        d = json.loads(line)
        # and then process the data...........

и, наконец, можно будет вставить данные в базу данных PostgreSQL. :)

...