По сути, есть небольшое приложение aiohttp, которое получает список запросов Impala, а затем отправляет их Impala. Однако для выполнения некоторых запросов может потребоваться много времени, поэтому было решено сделать это асинхронным / параллельным способом. Есть одно решение с работающими потоками, но хотелось бы посмотреть, можно ли достичь такой же скорости, используя только asyncio / tornado.
Мой код следующий:
async def run(self, queries):
# Here I validate queries
query_list = await self.build_query_list(split_queries) # Format: [[queries for connection_1], [queries for connection_2], ...]
start = time.time()
# Assing group of queries to each connection and wait results
result_queue = deque()
await multi([self.execute_impala_query(connection.connection, query_list[index], result_queue) for index, connection in enumerate(connection_list)])
# Close all connections
[await self.impala_connect_advance_pool.release_connection(connection) for connection in connection_list]
# Wait for Impala responses
while len(result_queue) < connect_limit:
continue
# Send results back
async def execute_impala_query(self, impala_connect, queries, queue):
return await multi([self.impala_response_to_json_response(impala_connect.cursor(), query, queue) for query in queries])
async def impala_response_to_json_response(self, impala_cursor, query, queue):
self.logger.info('execute query: {}'.format(query))
print ('execute query: {}'.format(query))
def get_results():
impala_cursor.execute(query)
results = as_pandas(impala_cursor)
impala_cursor.close()
self.logger.info('{} completed'.format(query))
print ('{} completed'.format(query))
queue.append(results.to_json(orient='records'))
IOLoop.current().spawn_callback(get_results)
Что происходит, так это то, что после запуска я вижу сообщения «выполнить запрос: запрос», выводимые на стандартный вывод, и я бы предположил, что все они запускаются и выполняются, однако для этого требуется 2 (или больше). ) пока версия с потоками. Я неправильно понял всю концепцию асинхронности или получил какую-то глупую ошибку где-то в методах?