передача курсора psycopg2 в метод on_status () tweepy - PullRequest
0 голосов
/ 06 марта 2020

Я пытаюсь передать курсор psycopg2 в твидовый поток.

Пул соединений и курсоры настраиваются в отдельном файле. Только курсор передается в качестве аргумента функции основного конвейера в другом файле с именем get_tweet_topic.py. Мне нужен курсор в методе on_status(), потому что у меня есть запрос, который нуждается в нем для выполнения.

Я не могу понять, как передать курсор на метод on_status() в классе MyStreamListener().

Я получаю ошибку:

2020-03-05T22:16:24.856945+00:00 app[worker.1]: self._target(*self._args, **self._kwargs)
2020-03-05T22:16:24.856945+00:00 app[worker.1]: File "/app/get_tweet_topic.py", line 81, in guess_topic_pipeline
2020-03-05T22:16:24.856946+00:00 app[worker.1]: status_streams.streaming_pipeline(api, cursor)
2020-03-05T22:16:24.856947+00:00 app[worker.1]: File "/app/status_streams.py", line 100, in streaming_pipeline
2020-03-05T22:16:24.856947+00:00 app[worker.1]: general_stream(api, cursor)
2020-03-05T22:16:24.856948+00:00 app[worker.1]: File "/app/status_streams.py", line 86, in general_stream
2020-03-05T22:16:24.856948+00:00 app[worker.1]: myStreamListener = MyStreamListener()
2020-03-05T22:16:24.856948+00:00 app[worker.1]: TypeError: __init__() missing 1 required positional argument: 'cursor'

Код :

status_streams.py:

import tweepy
import os

import db_queries
import follow

#define class for the stream listener
class MyStreamListener(tweepy.StreamListener):

    def __init__(self, cursor):
        super().__init__()
        self.cursor = cursor
        #set counter to only get 1200 tweets
        self.counter = 0
        self.max = 1200

    #get tweets
    def on_status(self, status):
        if not status.retweeted:
            status_dict = {'created_at': status.created_at.strftime('%y-%m-&d %H:%M'),
                    'source_stream': 'general stream',
                    'status_id': status.id_str,
                    'user_id': status.user.id_str,
                    'screen_name': status.user.name,
                    'tweet_text': status.text,
                    'num_likes': status.favorite_count,
                    'num_retweets': status.retweet_count}

            created_at = status_dict['created_at']
            source_stream = status_dict['source_stream']
            status_id = status_dict['status_id']
            user_id = status_dict['user_id']
            screen_name = status_dict['screen_name']
            tweet_text = status_dict['tweet_text']
            num_likes = status_dict['num_likes']
            num_retweets = status_dict['num_retweets']

            db_queries.insert_raw_tweets_table(cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)

        self.counter +=1
        if self.counter == self.max:
            return False


#get tweets from list of followers
def following_stream(api, cursor, user_name):
    try:
        for status in tweepy.Cursor(api.user_timeline, tweet_mode='extended', include_rts=False, screen_name=user_name).items(1):
            #ignore retweets
            if not status.retweeted:
                status_dict = {'created_at': status.created_at.strftime('%y-%m-%d %H:%M'),
                               'source_stream': 'following stream',
                               'status_id': status.id_str,
                               'user_id': status.user.id_str,
                               'screen_name': status.user.name,
                               'tweet_text':status.full_text,
                               'num_likes':status.favorite_count,
                               'num_retweets':status.retweet_count}

                created_at = status_dict['created_at']
                source_stream = status_dict['source_stream']
                status_id = status_dict['status_id']
                user_id = status_dict['user_id']
                screen_name = status_dict['screen_name']
                tweet_text = status_dict['tweet_text']
                num_likes = status_dict['num_likes']
                num_retweets = status_dict['num_retweets']

                db_queries.insert_raw_tweets_table(cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)


#function that controls both streams
def streaming_pipeline(api, cursor):
    #get list of all users that are currently followed
    #iterate through the following_list and grab the single latest tweet
    following_list = follow.get_following(api)
    for user in following_list:
        f_stream = following_stream(api, cursor, user)

    #stream class is used here
    myStreamListener = MyStreamListener()
    stream = tweepy.Stream(auth=api.auth, listener=myStreamListener(cursor=self.cursor))
    stream.filter(languages=['en'], track=['the'])


    cursor.close()

соответствующий раздел get_tweet_topi c .py:

def guess_topic_pipeline(api, conn, model, corpus, classifier):

    while True:
        cursor = conn.cursor()
        db_queries.create_temp_tweets_table(cursor)
        conn.commit()

        #use pipeline to grab tweets off twitter
        print('Retrieving statuses from streams...')
        status_streams.streaming_pipeline(api, cursor)
        print('Done retrieving...')

соответствующая часть кода пула соединений:

* Функция 1023 *

insert_raw_tweets_table () с фактическим запросом:

def insert_raw_tweets_table(cursor, createdAt, sourceStream, statusID, userID, screenName, tweetText, numLikes, numRetweets):
    cursor.execute('INSERT INTO tempTweets(createdAt, sourceStream, statusID, userID, screenName, tweetText) VALUES(%s, %s, %s, %s, %s, %s, %s, %s)', (createdAt, sourceStream, statusID, userID, screenName, tweetText, numLikes, numRetweets))

1 Ответ

0 голосов
/ 07 марта 2020

@ MauriceMeyer ответил на вопрос в комментариях, но вот рабочий код для ясности.

Я забыл ссылаться на курсор как self.cursor внутри класса, и я забыл передать курсор в качестве аргумента при создании экземпляра класса. Я передавал курсор в качестве аргумента после создания экземпляра, что неверно.

Правильный код:

class MyStreamListener(tweepy.StreamListener):

    def __init__(self, cursor):
        super().__init__()
        self.cursor = cursor
        #set counter to only get 1200 tweets
        self.counter = 0
        self.max = 1200

    #get tweets
    def on_status(self, status):
        if not status.retweeted:
            status_dict = {'created_at' : status.created_at.strftime('%y-%m-&d %H:%M'),
                           'source_stream' : 'general stream',
                           'status_id' : status.id_str,
                           'user_id' : status.user.id_str,
                           'screen_name' : status.user.name,
                           'tweet_text' : status.text,
                           'num_likes' : status.favorite_count,
                           'num_retweets' : status.retweet_count}

            created_at = status_dict['created_at']
            source_stream = status_dict['source_stream']
            status_id = status_dict['status_id']
            user_id = status_dict['user_id']
            screen_name = status_dict['screen_name']
            tweet_text = status_dict['tweet_text']
            num_likes = status_dict['num_likes']
            num_retweets = status_dict['num_retweets']

                                                   #▼ reference self.cursor here
            db_queries.insert_raw_tweets_table(self.cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)

        self.counter +=1
        if self.counter == self.max:
            return False




#stream class is used here                ▼ reference cursor here
    myStreamListener = MyStreamListener(cursor)
                                                                 #▼ removed reference to cursor here
    stream = tweepy.Stream(auth=api.auth, listener=myStreamListener)
    stream.filter(languages=['en'], track=['the'])
...