Я использую asyncio и aiohttp, чтобы собрать пакет запросов и выполнить их асинхронно.У меня более 500 000 пользователей в моей базе данных для обработки.
Я предварительно обработал этих 500 000 ~ пользователей партиями по 100, так как API может обрабатывать 100 пользователей за один запрос.
Затем я делаю группу, чтобы сделать 10запросов в то же время, у меня есть 919 запросов в общей сложности.Однако через определенное время запросы зависают / замирают, и я замечаю, что мой компьютер становится вялым.
import asyncio
import aiohttp
import config
from api import TwitterAPI
import motor.motor_asyncio
import itertools
from helper import create_batches, divide_chunks
import time
COUNT = 0
SCREEN_NAME = "MENnewsdesk"
async def fetch_user_objects(apps, session, user_id, tweet_mode):
batch_start = time.time()
global COUNT
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
db = client['twitter']
while True:
if (should_reset(apps) == True):
print("Reseting Apps: Sleep 15 Minutes")
time.sleep(901)
apps = init_twitter_engine()
app = get_available_app(apps)
response = await app.get_users_lookup(session, user_id=user_id, tweet_mode="extended")
if response['status'] != 200:
app.rate_limited = True
continue
for x in response['result']:
await db["MENnewsdesk"].update_one({"user_id": x["id"]}, {"$set": {
"user": x
}})
COUNT += len(response['result'])
batch_end = time.time()
print("APP ID: {app_id}, RATE LIMITED: {app_remaining}, TOTAL USERS: {total_users}, BATCH USERS: {batch_users}, BATCH TIME: {batch_time}"
.format(app_id=app.app_id, app_remaining=app.rate_limited, total_users=COUNT, batch_users=len(response['result']), batch_time=round((batch_end - batch_start), 2)))
return response
async def main():
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
db = client['twitter']
twitter_engine = init_twitter_engine()
async with aiohttp.ClientSession() as session:
users = []
cursor = db[SCREEN_NAME].find(
{"user": {"$exists": False}}, {"user_id": 1, "_id": 0})
async for document in cursor:
users.append(document["user_id"])
user_list = create_batches(users)
print(len(user_list))
batches = list(divide_chunks(user_list, 10))
print(len(batches))
for i, batch in enumerate(batches):
tasks = [asyncio.ensure_future(fetch_user_objects(twitter_engine, session, user_id=users, tweet_mode="extended")) for users in batch]
for t in tasks:
d = await t
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Я не уверен, что здесь происходит, как я могу отладить эту проблему?