Как разбить асинхронные запросы на python, если общее количество указано в заголовке - PullRequest
0 голосов
/ 01 мая 2020

* Просто чтобы уточнить заранее, я использую Почтальон для проверки своих запросов, и они возвращают результаты, которые я ищу.

Я подключаюсь к API, используя Python. API будет возвращать только 500 записей за запрос и предоставит общее количество записей в первом заголовке ответа «x-test-count»

Я, очевидно, не python подкован и чувствую, что я обрабатываю нумерация страниц совершенно неправильная. Посмотрите на функцию asyn c get. По сути, он берет общее количество от первого ответа и проходит по циклу:

                            async with session.get(paging_url) as response:

                                page_results = await response.json()

                                pages.extend(page_results)

Он возвращает результаты, но только 500. Поэтому может показаться, что он не захватывает каждую итерацию.

class Queue:

    def __init__(self, id, type):
        self.id = id
        self.type = type
        self.requests = []

class Test:

    def __init__(self):
        self.queue = []
        self.queue_list = []
        self.coroutines = []
        self.headers = {
            'Content-Type': 'application/json',
            'x-test-token': self.token,
        }

    def get_id(self, type=''):
        id = datetime.now().strftime('%Y%m-%d%H-%M%S-') + str(uuid4())
        if type != '':
            id = type + '-' + id
        return id

    def url_encode(self, url):

        # doesn't like encoding urls using yarl. I'm manually handling them below with UTF-8 encode
        url = url.replace(' ', '%20')
        #url = url.replace('?', '%3F')

        return url

    def queue_create(self, type=''):

        id = self.get_id(type='queue')

        if type == '':
            self.debug('Error: queue_create was not given a type')
            return

        id = Queue(id=id, type=type)
        self.debug('queue_create instantiated new queue class named: ' + id)

        # TODO: Add to list of active queues to track for create and destroy

        # Return name of new object
        return id

    def queue_run(self, name=''):
        self.debug('Starting queue_run')

        if name == '':
            self.debug('Error: queue_run asked to run without providing a name')
            #return

        **async def get(url, headers):
            async with aiohttp.ClientSession(headers=headers, connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
                async with session.get(url) as response:
                    self.debug('HTTP Response: ' + str(response.status))
                    # Set pagination vars to 1
                    current_page = 1
                    page_range = 1
                    # Check the status code. If other than 200, stop
                    assert response.status == 200
                    # Get the count of records. If not provided, set last_page to 1
                    try:
                        page_range = int(response.headers['x-test-count'])
                        self.debug(response.headers['x-test-count'])
                    except:
                        self.debug('x-test-count not provided, defaulted to 1')
                    first_page_results = await response.json()
                    if page_range == 1:
                        self.debug('Returning first page results only')
                        return first_page_results
                    else:
                        self.debug('Total results: ' + str(page_range) + '. Performing additional requests.')
                        pages = []
                        for records in range(1,page_range,500):
                            remaining_records = page_range - records
                            if remaining_records > 500:
                                paging_size = 500
                            else:
                                paging_size = remaining_records
                            # Create the paging URL
                            paging_url = url + '&size=' + str(paging_size) + '&from=' + str(records)
                            # Run paged requests
                            async with session.get(paging_url) as response:
                                page_results = await response.json()
                                # combine paged requests
                                pages.extend(page_results)
                            # Clear paging URL
                            paging_url = ''
                    return pages**

        # Establish the loop
        loop = asyncio.get_event_loop()

        # Establish coroutines and populate with queries from queue
        coroutines = []

        for query in self.queue:

            # Removed a lot of the actual code here. Basically, this establishes the URL and appends coroutines
                coroutines.append(get(url, headers=headers))


        # Start the asyncio loop

        results = loop.run_until_complete(asyncio.gather(*coroutines))

        return results

    def add_request(self, type, endpoint, query='', deleted=False, data='', full=False, paging_size='', paging_from=''):

        self.debug('Starting add_request')
        self.debug('Queue before append: ', item=self.queue)

        self.queue.append([type, endpoint, query, deleted, data, full, paging_size, paging_from]) 
        self.debug('Queue after append: ', item=self.queue)

        return self.queue

Итак, для запуска это выглядит примерно так

Test = Test()

Test.add_request('read', 'personnel', '', full=True ,deleted=False)

response = Test.queue_run()
...