запросы зависают через некоторое время, используя asyncio и aiohttp - PullRequest
0 голосов
/ 14 мая 2019

Я использую asyncio и aiohttp, чтобы собрать пакет запросов и выполнить их асинхронно.У меня более 500 000 пользователей в моей базе данных для обработки.

Я предварительно обработал этих 500 000 ~ пользователей партиями по 100, так как API может обрабатывать 100 пользователей за один запрос.

Затем я делаю группу, чтобы сделать 10запросов в то же время, у меня есть 919 запросов в общей сложности.Однако через определенное время запросы зависают / замирают, и я замечаю, что мой компьютер становится вялым.

import asyncio
import aiohttp
import config
from api import TwitterAPI
import motor.motor_asyncio
import itertools
from helper import create_batches, divide_chunks
import time

COUNT = 0
SCREEN_NAME = "MENnewsdesk"


async def fetch_user_objects(apps, session, user_id, tweet_mode):


    batch_start = time.time()

    global COUNT
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    db = client['twitter']


    while True:

        if (should_reset(apps) == True):
            print("Reseting Apps: Sleep 15 Minutes")
            time.sleep(901)
            apps = init_twitter_engine()

        app = get_available_app(apps)

        response = await app.get_users_lookup(session, user_id=user_id, tweet_mode="extended")

        if response['status'] != 200:
            app.rate_limited = True
            continue

        for x in response['result']:
            await db["MENnewsdesk"].update_one({"user_id": x["id"]}, {"$set": {
                "user": x
            }})

        COUNT += len(response['result'])

        batch_end = time.time()
        print("APP ID: {app_id}, RATE LIMITED: {app_remaining}, TOTAL USERS: {total_users}, BATCH USERS: {batch_users}, BATCH TIME: {batch_time}"
            .format(app_id=app.app_id, app_remaining=app.rate_limited, total_users=COUNT, batch_users=len(response['result']), batch_time=round((batch_end - batch_start), 2)))

        return response




async def main():

    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    db = client['twitter']

    twitter_engine = init_twitter_engine()

    async with aiohttp.ClientSession() as session:

        users = []
        cursor = db[SCREEN_NAME].find(
        {"user": {"$exists": False}}, {"user_id": 1, "_id": 0})
        async for document in cursor:
          users.append(document["user_id"])

        user_list = create_batches(users)
        print(len(user_list))
        batches = list(divide_chunks(user_list, 10))
        print(len(batches))

        for i, batch in enumerate(batches):
            tasks = [asyncio.ensure_future(fetch_user_objects(twitter_engine, session, user_id=users, tweet_mode="extended")) for users in batch]
            for t in tasks:
                d = await t


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Я не уверен, что здесь происходит, как я могу отладить эту проблему?

...