можно ли использовать многопроцессорную обработку с классом Tweepy StreamListner заказчика? - PullRequest
1 голос
/ 23 октября 2019

спасибо заранее, и, пожалуйста, позвольте мне не знать о области многопроцессорности / многопоточности. Я очень новичок в этом ...

В настоящее время мне удается получить все твиты, содержащие слово 'Yolo', и сохранить его в моей локальной БД (способом SQLite).

Теперь яЯ думаю об использовании SQS.

Допустим,

1) Я слушаю все случайные твиты с сервера Twitter 2) Если текущий твит (статус) - это то, что я хочу, то я хочусохранить это в БД. Это то, что делает функция on_status (status).

2-1) Допустим, (не в моем коде, но ..) я успешно создал temp_array, который должен содержать каждые 100 твитов (содержащих «yolo») в SharedПамять (слышал, что я должен использовать это, если я хочу использовать многопроцессорность для работы)

3) пока def on_status выполняет свою работу, я хочу использовать отдельный процессор (многопроцессорность) для отправки этих накопленных 100 твитов(в temp_array в разделяемой памяти) с помощью имени функции send_to_sqs ().

Для шагов 2) и 3) я хочу использовать два разных процессора.

Но я понял, что это нелегко (для меня), так как функции send_to_sqs () и update_tweet_bundle () и таблица.insert (tweet_dict) все работает в CLASS! (не на главном).

Может кто-нибудь подсказать мне, как использовать многопроцессорность в этом случае? (несколько функций должны запускать отдельный Core и выполняться в управляемом событиями классе?)

PS

Причина, по которой я хочу разделить эти функции, заключается в том, что при отправке накопленных 100 твитов в SQS, яхочу предотвратить наихудший случай: отправляя его в SQS, я могу пропустить несколько ценных твитов, так как функция send_to_sqs () работает и ценный твит всегда приходит в мой слушатель (класс StreamListener).

class StreamListener(tweepy.StreamListener):
    def __init__(self):
        super(StreamListener, self).__init__()
        self.temp_arr = []
        self.len_temp_arr = 0

    def is_useless(self, status):
        return True if len(status.text) < 10 else False

    def update_tweet_bundle(self, new_tweet):
        self.temp_arr.append(new_tweet)
        self.len_temp_arr+=1

    def send_to_sqs(self):
        # (1)
        # send temp_Arr to SQS!
        #   inside temp_arr, there should be 100 tweets as a dictionary formats.
        print('we sent tweet bundle to SQS(ASSUME!)')

        # (2)
        # empty out the temp_arr(suppose this in Shared Memory)
        self.temp_arr = []
        self.len_temp_arr = 0

    def on_status(self, status):
        if self.is_useless(status):
            return

        description = status.user.description
        text = status.text


        table = db[settings.TABLE_NAME]

        try:
            tweet_dict = dict(
                user_description=description,
                text=text,
            )
            table.insert(tweet_dict)
            if self.len_temp_arr % 100 == 0:
                self.send_to_sqs()
            else:
                self.update_tweet_bundle(tweet_dict) # I added for multi-thread..for later..

        except ProgrammingError as err:
            print(err)

    def on_error(self, status_code):
        pass

def main():

    stream_listner = StreamListner()
    stream = tweepy.Stream(auth = api.auth, listner = stream_listner)
    stream.filter(track=['yolo'])

if __name__ == '__main__':

    main()

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...