aiohttp.client_exceptions.ClientConnectionError происходит при пакетном запросе - PullRequest
0 голосов
/ 03 апреля 2019

Я собираюсь запросить данные из FB и сохранить их в моей локальной базе данных.

Я хочу использовать ThreadPoolExecutor + asyncio, чтобы добавить запрос как можно больше.

И многие данные были успешно обработаны, за исключением некоторых неудачных элементов. (1.8w data successed, 1.9k failed)

Основной код здесь:

class PullCampaignDataService(object):

    def __init__(self):
        self.fb_user_list = FbUser.objects.filter(is_active=1)
        self.max_worker = 5
        self.chunk_size = 20

    def run(self):
        with ThreadPoolExecutor(max_workers=self.max_worker) as executor:
            executor.map(self._get_accounts_by_fb_user, self.fb_user_list)

    def _get_accounts_by_fb_user(self, fb_user):
        my_api = FacebookAdsApi(FacebookSession(settings.FB_APP_ID, settings.FB_APP_SECRET, fb_user.access_token))
        me = User(fbid='me', api=my_api)
        ad_accounts = [ {'account_id': str(account['account_id']), "access_token": fb_user.access_token} for account in  me.get_ad_accounts(
            fields=[AdAccount.Field.id, AdAccount.Field.account_id])]

        iter_accounts = (
            ad_accounts[i:i + self.chunk_size] for i in range(0, len(ad_accounts), self.chunk_size)
        )

        new_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(new_loop)
        for chucnk_ad_accounts in iter_accounts:
            task_list = [
                asyncio.ensure_future(self._handle_account_campaign(ad_account)) for ad_account in chucnk_ad_accounts
            ]
            new_loop.run_until_complete(asyncio.gather(*task_list))

     async def _handle_account_campaign(self, ad_account):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
       settings.FB_GET_CAMPAIGN_URL.format(account_id=ad_account['account_id'], access_token=ad_account['access_token'])
                ) as response:
                    result = json.loads(await response.text())
                    # ..
        except aiohttp.client_exceptions.ClientConnectionError as E:
            info_logger.info("bad request connect error: " + str(E))

Вся ошибка, которую я поймал, была Cannot connect to host graph.facebook.com.

Файл журнала ошибок:

# 'format': '%(asctime)s %(levelname)s %(module)s %(thread)d %(message)s'

...
2019-04-03 16:58:23,012 INFO pull_campaign_data  336704 bad request connect error: Cannot connect to host graph.facebook.com: 443 ssl:True [None]

2019-04-03 16:58:23,015 INFO pull_campaign_data  336704 bad request connect error: Cannot connect to host graph.facebook.com: 443 ssl:True [None]

2019-04-03 16:58:23,027 INFO pull_campaign_data  336704 bad request connect error: Cannot connect to host graph.facebook.com: 443 ssl:True [None]

...

Я обнаружил, что ошибка произошла среди 3 разных потоков, но большинство ошибочных данных были в одном указанном потоке thread_336704

# 1903 fail total

thread_6244480   1 failed

thread_846848   22 failed

thread_336704   1880  failed

Я не знаю, как решить эту проблему. Это связано с тем, как я использую многопоточность и asyncio или что-то?

Я уже искал в стеке и Google, но я не могу найти решение

...