Я пытаюсь использовать Clevertap api, используя модуль python asyncio. Clevertap имеет несколько ограничений, таких как только 3 одновременных запроса и максимальный размер пакета 5000.
Я кратко расскажу, что нам нужно сделать в этом: сначала получите первый курсор для события, задав такие параметры, как event_name и time окно. Затем с помощью этого курсора l oop через запросы и получить ответы, каждый из которых содержит количество записей batch_size. Подробнее здесь: Clevertap GET API
Вот мой код, но я получаю недетерминированные c результаты. Под «Неопределенным c» я подразумеваю, что когда я запрашиваю большое количество событий для выбранного временного окна. Некоторые события принимают результаты других событий. Пожалуйста, пролейте свет на меня.
async def get_first_cursor(sem, clevertap_url, clevertap_headers, data_str, query_string, event):
logger.info(f"Getting First Cursor for event {event}")
try:
async with sem:
async with aiohttp.ClientSession() as session:
async with session.post(url=clevertap_url, data=data_str, headers=clevertap_headers, params=query_string,
timeout=100) as r:
r.raise_for_status()
r_json = await r.json()
return r_json
except Exception as e:
logger.critical(f"Error for event {event} with exception {e!r}")
raise e
async def get_chunk_data(sem, query_string, event, clevertap_url, clevertap_headers):
logger.info(f"Getting chunk of data for event {event}")
try:
async with sem:
async with aiohttp.ClientSession() as session:
async with session.get(clevertap_url, headers=clevertap_headers,
params=query_string, timeout=100) as get_response:
get_response.raise_for_status()
r_json = await get_response.json()
return r_json
except Exception as e:
logger.critical(f"Error for event {event} with exception {e!r}")
raise e
def get_cursor(r_json):
if r_json.get('next_cursor') is not None and r_json.get('next_cursor') != '':
return unquote(r_json.get('next_cursor'))
elif r_json.get('cursor') is not None and r_json.get('cursor') != '':
return unquote(r_json.get('cursor'))
else:
return None
async def main(window_start_time, window_end_time, events_list, batch_size, retries):
clevertap_url = Settings.CLEVERTAP_EVENTS_API_URL
clevertap_headers = Settings.CLEVERTAP_HEADER
data, query_string = create_url_parameter(window_start_time, window_end_time, batch_size)
# queue = asyncio.Queue()
tasks = []
event_responses = {}
sem = asyncio.BoundedSemaphore(3)
for event in events_list:
data['event_name'] = event
data_str = json.dumps(data)
event_responses[event] = []
# get first cursor of each event
task = asyncio.create_task(
get_first_cursor(sem, clevertap_url, clevertap_headers, data_str, query_string, event)
)
tasks.append((task, event))
i = 0
new_set_of_tasks = []
while i < len(tasks):
r_json = await tasks[i][0]
cursor = get_cursor(r_json)
if cursor is not None:
task_new = asyncio.create_task(
get_chunk_data(sem, {'cursor': cursor}, tasks[i][1], clevertap_url, clevertap_headers)
)
new_set_of_tasks.extend([(task_new, tasks[i][1])])
i += 1
i = 0
while i < len(new_set_of_tasks):
r_json = await new_set_of_tasks[i][0]
cursor = get_cursor(r_json)
if r_json.get('records') is not None:
event_responses[tasks[i][1]].extend(r_json.get('records'))
if cursor is not None:
task_new = asyncio.create_task(
get_chunk_data(sem, {'cursor': cursor}, tasks[i][1], clevertap_url, clevertap_headers)
)
new_set_of_tasks.extend([(task_new, tasks[i][1])])
i += 1
total_count = 0
for event, records in event_responses.items():
total_count += len(records)
logger.info(f"event {event} has {len(records)} records")
logger.info(f"total count {total_count}")
Вот журнал того, что я вижу: Здесь у event1 больше 0 записей, но его результаты дублируют event4.
INFO 2020-06-19 00:46:34,799 get_clevertap_data_v2 81890 4402326976 event event1 has 0 records
INFO 2020-06-19 00:46:34,799 get_clevertap_data_v2 81890 4402326976 event event2 has 24 records
INFO 2020-06-19 00:46:34,799 get_clevertap_data_v2 81890 4402326976 event event3 has 1 records
INFO 2020-06-19 00:46:34,799 get_clevertap_data_v2 81890 4402326976 event event4 has 0 records