Я пытаюсь передать курсор psycopg2 в твидовый поток.
Пул соединений и курсоры настраиваются в отдельном файле. Только курсор передается в качестве аргумента функции основного конвейера в другом файле с именем get_tweet_topic.py
. Мне нужен курсор в методе on_status()
, потому что у меня есть запрос, который нуждается в нем для выполнения.
Я не могу понять, как передать курсор на метод on_status()
в классе MyStreamListener()
.
Я получаю ошибку:
2020-03-05T22:16:24.856945+00:00 app[worker.1]: self._target(*self._args, **self._kwargs)
2020-03-05T22:16:24.856945+00:00 app[worker.1]: File "/app/get_tweet_topic.py", line 81, in guess_topic_pipeline
2020-03-05T22:16:24.856946+00:00 app[worker.1]: status_streams.streaming_pipeline(api, cursor)
2020-03-05T22:16:24.856947+00:00 app[worker.1]: File "/app/status_streams.py", line 100, in streaming_pipeline
2020-03-05T22:16:24.856947+00:00 app[worker.1]: general_stream(api, cursor)
2020-03-05T22:16:24.856948+00:00 app[worker.1]: File "/app/status_streams.py", line 86, in general_stream
2020-03-05T22:16:24.856948+00:00 app[worker.1]: myStreamListener = MyStreamListener()
2020-03-05T22:16:24.856948+00:00 app[worker.1]: TypeError: __init__() missing 1 required positional argument: 'cursor'
Код :
status_streams.py:
import tweepy
import os
import db_queries
import follow
#define class for the stream listener
class MyStreamListener(tweepy.StreamListener):
def __init__(self, cursor):
super().__init__()
self.cursor = cursor
#set counter to only get 1200 tweets
self.counter = 0
self.max = 1200
#get tweets
def on_status(self, status):
if not status.retweeted:
status_dict = {'created_at': status.created_at.strftime('%y-%m-&d %H:%M'),
'source_stream': 'general stream',
'status_id': status.id_str,
'user_id': status.user.id_str,
'screen_name': status.user.name,
'tweet_text': status.text,
'num_likes': status.favorite_count,
'num_retweets': status.retweet_count}
created_at = status_dict['created_at']
source_stream = status_dict['source_stream']
status_id = status_dict['status_id']
user_id = status_dict['user_id']
screen_name = status_dict['screen_name']
tweet_text = status_dict['tweet_text']
num_likes = status_dict['num_likes']
num_retweets = status_dict['num_retweets']
db_queries.insert_raw_tweets_table(cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)
self.counter +=1
if self.counter == self.max:
return False
#get tweets from list of followers
def following_stream(api, cursor, user_name):
try:
for status in tweepy.Cursor(api.user_timeline, tweet_mode='extended', include_rts=False, screen_name=user_name).items(1):
#ignore retweets
if not status.retweeted:
status_dict = {'created_at': status.created_at.strftime('%y-%m-%d %H:%M'),
'source_stream': 'following stream',
'status_id': status.id_str,
'user_id': status.user.id_str,
'screen_name': status.user.name,
'tweet_text':status.full_text,
'num_likes':status.favorite_count,
'num_retweets':status.retweet_count}
created_at = status_dict['created_at']
source_stream = status_dict['source_stream']
status_id = status_dict['status_id']
user_id = status_dict['user_id']
screen_name = status_dict['screen_name']
tweet_text = status_dict['tweet_text']
num_likes = status_dict['num_likes']
num_retweets = status_dict['num_retweets']
db_queries.insert_raw_tweets_table(cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)
#function that controls both streams
def streaming_pipeline(api, cursor):
#get list of all users that are currently followed
#iterate through the following_list and grab the single latest tweet
following_list = follow.get_following(api)
for user in following_list:
f_stream = following_stream(api, cursor, user)
#stream class is used here
myStreamListener = MyStreamListener()
stream = tweepy.Stream(auth=api.auth, listener=myStreamListener(cursor=self.cursor))
stream.filter(languages=['en'], track=['the'])
cursor.close()
соответствующий раздел get_tweet_topi c .py:
def guess_topic_pipeline(api, conn, model, corpus, classifier):
while True:
cursor = conn.cursor()
db_queries.create_temp_tweets_table(cursor)
conn.commit()
#use pipeline to grab tweets off twitter
print('Retrieving statuses from streams...')
status_streams.streaming_pipeline(api, cursor)
print('Done retrieving...')
соответствующая часть кода пула соединений:
* Функция 1023 *
insert_raw_tweets_table () с фактическим запросом:
def insert_raw_tweets_table(cursor, createdAt, sourceStream, statusID, userID, screenName, tweetText, numLikes, numRetweets):
cursor.execute('INSERT INTO tempTweets(createdAt, sourceStream, statusID, userID, screenName, tweetText) VALUES(%s, %s, %s, %s, %s, %s, %s, %s)', (createdAt, sourceStream, statusID, userID, screenName, tweetText, numLikes, numRetweets))