Я пытаюсь проанализировать данные из нескольких объектов JSON, возвращенных методом tweepy
on_data
(каждая строка потока данных содержит один объект JSON, каждый поток данных содержит от одной до нескольких строк), и отправить его в PostgreSQL DB, но он всегда не может вставить запись в таблицу с сообщением TypeError ('индексы списка должны быть целыми числами, а не str).
Я не уверен, но я думаю, что проблема находится в коде, где я пытаюсь data.splitlines()
и читать / обрабатывать каждую строку в PostgreSQL. Однако, когда я пытался распечатать данные на консоли, он всегда возвращает несколько строк данных и не может обработать их одну за другой.
Я настраиваю скрипт Python для сбора данных потокового твиттера с помощью tweepy
и пытаюсь сохранить данные для Apache Spark и PostgreSQL DB, чтобы я мог загрузить их в Grafana.
Я также попробовал несколько решений из здесь , здесь и здесь . Однако, похоже, что проблема все еще существует.
def on_data(self, data):
(width, height) = getTerminalSize()
try:
# here data comes in a multiple json object each in a line; but when i'm trying to read it line by line using splitlines() and for loop for each line it still return multiple lines so that i can't isert into the postgresql table
lines = data.splitlines()
for line in lines:
d = json.loads(line)
# processing the data
if 'extended_tweet' in d and 'RT @' not in d['extended_tweet']['full_text'].encode('utf-8'):
teks_twit = str(d['extended_tweet']['full_text'].encode('utf-8'))
print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
print("GMT: "+console_colors.WHITE+time.strftime("%a, %d %b %Y %I:%M:%S GMT"+console_colors.ENDC, time.gmtime()))
print("Local: "+console_colors.WHITE+strftime("%a, %d %b %Y %I:%M:%S %p %Z (GMT+7)\r"+console_colors.ENDC))
# send data to socket for processing in spark
self.client_socket.send(data)
# insert record to table
try:
conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
cursor = conn.cursor()
# insert ops
item = {'SCREENNAME': str(d['user']['screen_name'].encode('utf-8')), 'TWEET': teks_twit, 'WORDS': teks_twit.split(), 'HASHTAGS': str(d['entities']['hashtags']['text']), 'WORDCOUNTS': len([teks_twit.split()])}
sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, WORDS, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
cursor.execute(sqlquery, (item['SCREENNAME'],item['TWEET'],item['WORDS'],item['HASHTAGS'],item['WORDCOUNTS']))
conn.commit()
count = cursor.rowcount
print (count, " Record inserted successfully into table")
except (Exception, psycopg2.Error) as error:
if(conn):
print("Failed to insert record into table", error)
finally:
#closing database connection.
if(conn):
cursor.close()
conn.close()
print(" [*] PostgreSQL connection is closed")
print console_colors.GREY+ "_" * width + console_colors.ENDC
elif 'extended_tweet' not in d and 'RT @' not in d['text'].encode('utf-8'):
teks_twit = str(d['text'].encode('utf-8'))
print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
print("GMT: "+console_colors.WHITE+time.strftime("%a, %d %b %Y %I:%M:%S GMT"+console_colors.ENDC, time.gmtime()))
print("Local: "+console_colors.WHITE+strftime("%a, %d %b %Y %I:%M:%S %p %Z (GMT+7)\r"+console_colors.ENDC))
# send data to socket for processing in spark
self.client_socket.send(data)
# insert record to table
try:
conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
cursor = conn.cursor()
# insert ops
item = {'SCREENNAME': str(d['user']['screen_name'].encode('utf-8')), 'TWEET': teks_twit, 'WORDS': teks_twit.split(), 'HASHTAGS': str(d['entities']['hashtags']['text']), 'WORDCOUNTS': len([teks_twit.split()])}
sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, WORDS, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
cursor.execute(sqlquery, (item['SCREENNAME'],item['TWEET'],item['WORDS'],item['HASHTAGS'],item['WORDCOUNTS']))
conn.commit()
count = cursor.rowcount
print (count, " Record inserted successfully into table")
except (Exception, psycopg2.Error) as error :
if(conn):
print("Failed to insert record into table", error)
finally:
#closing database connection.
if(conn):
cursor.close()
conn.close()
print(" [*] PostgreSQL connection is closed")
print console_colors.GREY+ "_" * width + console_colors.ENDC
else:
pass
except BaseException as e:
print("Error on_data: %s" % str(e))
Пример ожидаемого результата в каждом цикле цикла for:
d['user']['screen_name']
fooUser
только один результат за каждый ход.
Фактический результат:
d['user']['screen_name']
fooUser
userNumber2
anotherUser
более одного результата за каждый оборот цикла.
Обновление:
Я обновляю код, и кажется, что результат d['user']['screen_name']
, а также другие ключи успешно возвращают один элемент, прямо перед его вставкой в таблицу PostgreSQL.
Однако , сообщение TypeError ('индексы списка должны быть целыми числами, а не str) все еще там. Все еще пробую другой вариант.
Это изменения:
...
try:
# split each line of data
for item in data.splitlines():
# loads the data into json object
d = json.loads(item)
# processing the data
if d["user"]["lang"] == "id":
if 'extended_tweet' in d and 'RT @' not in d['extended_tweet']['full_text'].encode('utf-8'):
teks_twit = str(d['extended_tweet']['full_text'].encode('utf-8'))
print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
...
и
...
try:
conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
cursor = conn.cursor()
# insert ops
# check if the data is already separated
print d['user']['screen_name']
# insert data into table
sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
cursor.execute(sqlquery, (str(d['user']['screen_name'].encode('utf-8')), str(d['extended_tweet']['full_text'].encode('utf-8')), str(d['entities']['hashtags']['text']), len(str(d['extended_tweet']['full_text'].encode('utf-8')).split())))
conn.commit()
count = cursor.rowcount
print (count, " Record inserted successfully into table")
except (Exception, psycopg2.Error) as error:
if(conn):
print("Failed to insert record into table", error)
finally:
#closing database connection.
if(conn):
cursor.close()
conn.close()
print(" [*] PostgreSQL connection is closed")
...