обрабатывать куски строк с неизвестной длиной - используя async? - PullRequest
0 голосов
/ 12 мая 2018

У меня есть файл с URL в каждой строке. Я выполняю вызов асинхронных запросов, используя aiohttp для этих URL-адресов в пакетном режиме.

Файл огромный, а память маленькая. Я не знаю, сколько строк в файле, и чтение всей строки построчно со счетчиком займет много времени.

Как я могу:

  • захватить 100 000 строк в список
  • обработать эти
  • чтение файла паузы
  • захватить следующие 100 000 строк = повторять, пока я не возьму остаток?

Я думал по принципу асинхронности, но, возможно, я неправильно понял эту библиотеку.

counter = 0
inputs=[]
async with open("test.txt") as f:
    for line in f:
        counter=counter+1
        if counter%100000 != 0:
              inputs.append(line.strip())
        else:
              await get_req_fn(inputs)
              inputs=[]

Ответы [ 2 ]

0 голосов
/ 13 мая 2018

«Пауза», которую вы ищете, это именно то, что делает await.Он блокирует текущую сопрограмму (позволяя другим делать успехи), пока та, которую он ожидает, не завершится.Ваш код в основном правильный, за исключением использования async with вокруг open.Вот более полная версия (не проверенная):

import asyncio, aiohttp
from itertools import islice

BATCH_SIZE = 1000  # start with something reasonable

async def main():
    async with aiohttp.ClientSession() as session:
        with open("test.txt") as f:
            while True:
                # take the next BATCH_SIZE lines
                batch = [line.strip() for line in islice(f, BATCH_SIZE)]
                if not batch:
                    # no more lines - we're done
                    break
                await get_req_fn(batch, session)

При реализации get_req_fn необходимо позаботиться о том, чтобы включить параллельное выполнение, а также дождаться завершения всего пакета.Ключевым компонентом для этого являются комбинаторы сопрограмм, функции, которые объединяют несколько сопрограмм в один ожидаемый объект.Мощный и очень простой в использовании gather:

async def get_req_fn(urls, session):
    coros = []
    for url in urls:
        coros.append(single_req(url, session))
    await asyncio.gather(*coros)

gather запускает заданные сопрограммы и, когда await ed, блокирует текущую, пока всеиз них завершено.Это позволяет await get_req_fn(batch, session) приостанавливать чтение до тех пор, пока не будет загружен весь пакет.

Наконец, single_req может выглядеть следующим образом:

async def single_req(url, session):
    try:
        async with session.get(url) as resp:
            text = await resp.text()
            # process text, or save it to a file, etc
    except IOError as e:
        print(f'error fetching {url}: {e}')

Все функции принимают объект сеанса создан в main(), потому что создание новой сессии для каждого запроса настоятельно не рекомендуется в документации.

Наконец, чтобы запустить все это, используйте что-то вроде:

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
0 голосов
/ 13 мая 2018

Чтение и обработка прочитанных данных асинхронно не делает это быстрее.Вместо этого вы можете использовать async, когда у вас есть длинная задача, такая как обработка данных, и вы хотите визуализировать что-то при этом.

Я бы просто прочитал файл в память, а затем обработал данные.Конечно, если вы не можете сделать это, потому что это не помещается в памяти, я бы использовал await, который @ user4815162342 упомянул в своем ответе.

...