спасибо заранее, и, пожалуйста, позвольте мне не знать о области многопроцессорности / многопоточности. Я очень новичок в этом ...
В настоящее время мне удается получить все твиты, содержащие слово 'Yolo', и сохранить его в моей локальной БД (способом SQLite).
Теперь яЯ думаю об использовании SQS.
Допустим,
1) Я слушаю все случайные твиты с сервера Twitter 2) Если текущий твит (статус) - это то, что я хочу, то я хочусохранить это в БД. Это то, что делает функция on_status (status).
2-1) Допустим, (не в моем коде, но ..) я успешно создал temp_array, который должен содержать каждые 100 твитов (содержащих «yolo») в SharedПамять (слышал, что я должен использовать это, если я хочу использовать многопроцессорность для работы)
3) пока def on_status выполняет свою работу, я хочу использовать отдельный процессор (многопроцессорность) для отправки этих накопленных 100 твитов(в temp_array в разделяемой памяти) с помощью имени функции send_to_sqs ().
Для шагов 2) и 3) я хочу использовать два разных процессора.
Но я понял, что это нелегко (для меня), так как функции send_to_sqs () и update_tweet_bundle () и таблица.insert (tweet_dict) все работает в CLASS! (не на главном).
Может кто-нибудь подсказать мне, как использовать многопроцессорность в этом случае? (несколько функций должны запускать отдельный Core и выполняться в управляемом событиями классе?)
PS
Причина, по которой я хочу разделить эти функции, заключается в том, что при отправке накопленных 100 твитов в SQS, яхочу предотвратить наихудший случай: отправляя его в SQS, я могу пропустить несколько ценных твитов, так как функция send_to_sqs () работает и ценный твит всегда приходит в мой слушатель (класс StreamListener).
class StreamListener(tweepy.StreamListener):
def __init__(self):
super(StreamListener, self).__init__()
self.temp_arr = []
self.len_temp_arr = 0
def is_useless(self, status):
return True if len(status.text) < 10 else False
def update_tweet_bundle(self, new_tweet):
self.temp_arr.append(new_tweet)
self.len_temp_arr+=1
def send_to_sqs(self):
# (1)
# send temp_Arr to SQS!
# inside temp_arr, there should be 100 tweets as a dictionary formats.
print('we sent tweet bundle to SQS(ASSUME!)')
# (2)
# empty out the temp_arr(suppose this in Shared Memory)
self.temp_arr = []
self.len_temp_arr = 0
def on_status(self, status):
if self.is_useless(status):
return
description = status.user.description
text = status.text
table = db[settings.TABLE_NAME]
try:
tweet_dict = dict(
user_description=description,
text=text,
)
table.insert(tweet_dict)
if self.len_temp_arr % 100 == 0:
self.send_to_sqs()
else:
self.update_tweet_bundle(tweet_dict) # I added for multi-thread..for later..
except ProgrammingError as err:
print(err)
def on_error(self, status_code):
pass
def main():
stream_listner = StreamListner()
stream = tweepy.Stream(auth = api.auth, listner = stream_listner)
stream.filter(track=['yolo'])
if __name__ == '__main__':
main()