python asyncio разбивать курсор на страницы с помощью aiohttp - PullRequest
0 голосов
/ 18 июня 2020

Я пытаюсь использовать Clevertap api, используя модуль python asyncio. Clevertap имеет несколько ограничений, таких как только 3 одновременных запроса и максимальный размер пакета 5000.

Я кратко расскажу, что нам нужно сделать в этом: сначала получите первый курсор для события, задав такие параметры, как event_name и time окно. Затем с помощью этого курсора l oop через запросы и получить ответы, каждый из которых содержит количество записей batch_size. Подробнее здесь: Clevertap GET API

Вот мой код, но я получаю недетерминированные c результаты. Под «Неопределенным c» я подразумеваю, что когда я запрашиваю большое количество событий для выбранного временного окна. Некоторые события принимают результаты других событий. Пожалуйста, пролейте свет на меня.

async def get_first_cursor(sem, clevertap_url, clevertap_headers, data_str, query_string, event):
    logger.info(f"Getting First Cursor for event {event}")
    try:
        async with sem:
            async with aiohttp.ClientSession() as session:
                async with session.post(url=clevertap_url, data=data_str, headers=clevertap_headers, params=query_string,
                                        timeout=100) as r:
                    r.raise_for_status()
                    r_json = await r.json()
                    return r_json
    except Exception as e:
        logger.critical(f"Error for event {event} with exception {e!r}")
        raise e


async def get_chunk_data(sem, query_string, event, clevertap_url, clevertap_headers):
    logger.info(f"Getting chunk of data for event {event}")
    try:
        async with sem:
            async with aiohttp.ClientSession() as session:
                async with session.get(clevertap_url, headers=clevertap_headers,
                                       params=query_string, timeout=100) as get_response:
                    get_response.raise_for_status()
                    r_json = await get_response.json()
                    return r_json
    except Exception as e:
        logger.critical(f"Error for event {event} with exception {e!r}")
        raise e


def get_cursor(r_json):
    if r_json.get('next_cursor') is not None and r_json.get('next_cursor') != '':
        return unquote(r_json.get('next_cursor'))
    elif r_json.get('cursor') is not None and r_json.get('cursor') != '':
        return unquote(r_json.get('cursor'))
    else:
        return None

async def main(window_start_time, window_end_time, events_list, batch_size, retries):
    clevertap_url = Settings.CLEVERTAP_EVENTS_API_URL
    clevertap_headers = Settings.CLEVERTAP_HEADER
    data, query_string = create_url_parameter(window_start_time, window_end_time, batch_size)
    # queue = asyncio.Queue()
    tasks = []
    event_responses = {}
    sem = asyncio.BoundedSemaphore(3)
    for event in events_list:
        data['event_name'] = event
        data_str = json.dumps(data)
        event_responses[event] = []
        # get first cursor of each event
        task = asyncio.create_task(
            get_first_cursor(sem, clevertap_url, clevertap_headers, data_str, query_string, event)
        )
        tasks.append((task, event))

    i = 0
    new_set_of_tasks = []
    while i < len(tasks):
        r_json = await tasks[i][0]
        cursor = get_cursor(r_json)
        if cursor is not None:
            task_new = asyncio.create_task(
                get_chunk_data(sem, {'cursor': cursor}, tasks[i][1], clevertap_url, clevertap_headers)
            )
            new_set_of_tasks.extend([(task_new, tasks[i][1])])
        i += 1

    i = 0
    while i < len(new_set_of_tasks):
        r_json = await new_set_of_tasks[i][0]
        cursor = get_cursor(r_json)
        if r_json.get('records') is not None:
            event_responses[tasks[i][1]].extend(r_json.get('records'))
        if cursor is not None:
            task_new = asyncio.create_task(
                get_chunk_data(sem, {'cursor': cursor}, tasks[i][1], clevertap_url, clevertap_headers)
            )
            new_set_of_tasks.extend([(task_new, tasks[i][1])])
        i += 1

    total_count = 0
    for event, records in event_responses.items():
        total_count += len(records)
        logger.info(f"event {event} has {len(records)} records")
    logger.info(f"total count {total_count}")

Вот журнал того, что я вижу: Здесь у event1 больше 0 записей, но его результаты дублируют event4.

INFO 2020-06-19 00:46:34,799 get_clevertap_data_v2 81890 4402326976 event event1 has 0 records
INFO 2020-06-19 00:46:34,799 get_clevertap_data_v2 81890 4402326976 event event2 has 24 records
INFO 2020-06-19 00:46:34,799 get_clevertap_data_v2 81890 4402326976 event event3 has 1 records
INFO 2020-06-19 00:46:34,799 get_clevertap_data_v2 81890 4402326976 event event4 has 0 records
...