Обработка sure_future и отсутствующих задач - PullRequest
0 голосов
/ 05 марта 2019

У меня есть потоковое приложение, которое почти непрерывно принимает данные, введенные в качестве входных данных, и отправляет HTTP-запрос, используя это значение, и делает что-то с возвращенным значением.

Очевидно, чтобы ускорить процесс, я использовал asyncio иБиблиотеки aiohttp в Python 3.7, чтобы получить лучшую производительность, но становится трудно отлаживать, учитывая, как быстро перемещаются данные.

Вот так выглядит мой код

'''
Gets the final requests
'''
async def apiRequest(info, url, session, reqType, post_data=''):
    if reqType:
        async with session.post(url, data = post_data) as response:
            info['response'] = await response.text()
    else:
        async with session.get(url+post_data) as response:
            info['response'] =  await response.text()
    logger.debug(info)
    return info

'''
Loops through the batches and sends it for request
'''
async def main(data, listOfData):
    tasks = []
    async with ClientSession() as session:
        for reqData in listOfData:
            try:
                task = asyncio.ensure_future(apiRequest(**reqData))
                tasks.append(task)
            except Exception as e:
                print(e)
                exc_type, exc_obj, exc_tb = sys.exc_info()
                fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
                print(exc_type, fname, exc_tb.tb_lineno)
        responses = await asyncio.gather(*tasks)
    return responses #list of APIResponses

'''
Streams data in and prepares batches to send for requests
'''
async def Kconsumer(data, loop, batchsize=100):
        consumer = AIOKafkaConsumer(**KafkaConfigs)
        await consumer.start()
        dataPoints = []
        async for msg in consumer:
            try:
                sys.stdout.flush()
                consumedMsg = loads(msg.value.decode('utf-8'))
                if consumedMsg['tid']:
                    dataPoints.append(loads(msg.value.decode('utf-8')))
                if len(dataPoints)==batchsize or time.time() - startTime>5:
                    '''
                    #1: The task below goes and sends HTTP GET requests in bulk using aiohttp
                    '''
                    task = asyncio.ensure_future(getRequests(data, dataPoints))
                    res = await asyncio.gather(*[task])
                    if task.done():
                        outputs = []
                        '''
                        #2: Does some ETL on the returned values
                        '''
                        ids = await asyncio.gather(*[doSomething(**{'tid':x['tid'],
                                                'cid':x['cid'], 'tn':x['tn'],
                                                'id':x['id'], 'ix':x['ix'],
                                                'ac':x['ac'], 'output':to_dict(xmltodict.parse(x['response'],encoding='utf-8')),
                                                'loop':loop, 'option':1}) for x in res[0]])
                        simplySaveDataIntoDataBase(id) # This is where I see some missing data in the database
                    dataPoints = []
            except Exception as e:
                    logger.error(e)
                    logger.error(traceback.format_exc())
                    exc_type, exc_obj, exc_tb = sys.exc_info()
                    fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
                    logger.error(str(exc_type) +' '+ str(fname) +' '+ str(exc_tb.tb_lineno))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(Kconsumer(data, loop, batchsize=100))
    loop.run_forever()

Нужно ли обеспечить sure_futureбыть await ред?Как aiohttp обрабатывает запросы, которые приходят немного позже, чем другие?Разве он не должен сдерживать всю партию вместо того, чтобы забывать об этом?

1 Ответ

0 голосов
/ 06 марта 2019

Нужно ли ensure_future быть await ed?

Да, и ваш код уже делает это.await asyncio.gather(*tasks) ожидает предоставленных задач и возвращает их результаты в том же порядке.

Обратите внимание, что await asyncio.gather(*[task]) не имеет смысла, потому что это эквивалентно await asyncio.gather(task), что снова эквивалентно await task,Другими словами, когда вам нужен результат getRequests(data, dataPoints), вы можете написать res = await getRequests(data, dataPoints) без церемонии первого вызова ensure_future() и последующего вызова gather().

Фактически вам почти никогда не нужноВызовите ensure_future самостоятельно:

  • , если вам нужно дождаться выполнения нескольких задач, вы можете передать объекты сопрограммы непосредственно в gather, например, gather(coroutine1(), coroutine2()).
  • , если вам нужно порождатьВ качестве фоновой задачи вы можете позвонить asyncio.create_task(coroutine(...))

Как aiohttp обрабатывает запросы, которые приходят немного позже, чем другие?Разве он не должен удерживать весь пакет вместо того, чтобы забыть об этом?

Если вы используете gather, все запросы должны завершиться, прежде чем какой-либо из них вернется.(Это не политика aiohttp, это то, как работает gather.) Если вам нужно реализовать тайм-аут, вы можете использовать asyncio.wait_for или аналогичный.

...