Как загрузить несколько файлов, используя asyncio и wget в python? - PullRequest
0 голосов
/ 08 апреля 2020

Я хочу скачать много файлов с дукаскопы. Типичный URL выглядит следующим образом.

url = 'http://datafeed.dukascopy.com/datafeed/AUDUSD/2014/01/02/00h_ticks.bi5'

Я попробовал ответить здесь , но большинство файлов имеют размер 0.

Но когда я просто зациклился с помощью wget (см. ниже), я получил полные файлы.

import wget
from urllib.error import HTTPError

pair = 'AUDUSD'
for year in range(2014,2015):
    for month in range(1,13):
        for day in range(1,32):
            for hour in range(24): 
                try:
                    url = 'http://datafeed.dukascopy.com/datafeed/' + pair + '/' + str(year) + '/' + str(month-1).zfill(2) + '/' + str(day).zfill(2) + '/' + str(hour).zfill(2) + 'h_ticks.bi5'
                    filename = pair + '-' + str(year) + '-' + str(month-1).zfill(2) + '-' + str(day).zfill(2) + '-' + str(hour).zfill(2) + 'h_ticks.bi5'
                    x = wget.download(url, filename)
#                     print(url)
                except HTTPError as err:
                    if err.code == 404:
                        print((year, month,day, hour))
                    else:
                        raise

Ранее я использовал следующий код для очистки веб-сайтов, но не для загрузки файлов.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from aiohttp import ClientSession, client_exceptions
from asyncio import Semaphore, ensure_future, gather, run
from json import dumps, loads

limit = 10
http_ok = [200]


async def scrape(url_list):
    tasks = list()
    sem = Semaphore(limit)

    async with ClientSession() as session:
        for url in url_list:
            task = ensure_future(scrape_bounded(url, sem, session))
            tasks.append(task)

        result = await gather(*tasks)

    return result


async def scrape_bounded(url, sem, session):
    async with sem:
        return await scrape_one(url, session)


async def scrape_one(url, session):
    try:
        async with session.get(url) as response:
            content = await response.read()
    except client_exceptions.ClientConnectorError:
        print('Scraping %s failed due to the connection problem', url)
        return False

    if response.status not in http_ok:
        print('Scraping%s failed due to the return code %s', url, response.status)
        return False

    content = loads(content.decode('UTF-8'))

    return content

if __name__ == '__main__':
    urls = ['http://demin.co/echo1/', 'http://demin.co/echo2/']
    res = run(scrape(urls))

    print(dumps(res, indent=4))

Существует ответ для загрузки нескольких файлов с использованием многопроцессорной обработки здесь . Но я думаю, что asyncio может быть быстрее.

Когда возвращаются файлы размером 0, это может быть сервер, ограничивающий количество запросов, но я все еще хотел бы изучить, есть ли возможность загрузки нескольких файлов с использованием wget и asyncio.

1 Ответ

1 голос
/ 09 апреля 2020

Вот пример. Декодирование / кодирование, а также операции записи должны быть зафиксированы в зависимости от целевого типа данных.

   #!/usr/bin/env python3
# -*- coding: utf-8 -*-

from aiofile import AIOFile
from aiohttp import ClientSession
from asyncio import ensure_future, gather, run, Semaphore
from calendar import monthlen
from lzma import open as lzma_open
from struct import calcsize, unpack
from io import BytesIO
from json import dumps

http_ok = [200]
limit = 5
base_url = 'http://datafeed.dukascopy.com/datafeed/{}/{}/{}/{}/{}h_ticks.bi5'
fmt = '>3i2f'
chunk_size = calcsize(fmt)


async def download():
    tasks = list()
    sem = Semaphore(limit)

    async with ClientSession() as session:
        for pair in ['AUDUSD']:
            for year in [2014, 2015]:
                for month in range(1, 12):
                    for day in range(1, monthlen(year, month)):
                        for hour in range(0, 23):
                            tasks.append(ensure_future(download_one(pair=pair,
                                                                    year=str(year).zfill(2),
                                                                    month=str(month).zfill(2),
                                                                    day=str(day).zfill(2),
                                                                    hour=str(hour).zfill(2),
                                                                    session=session,
                                                                    sem=sem)))
        return await gather(*tasks)


async def download_one(pair, year, month, day, hour, session, sem):
    url = base_url.format(pair, year, month, day, hour)
    data = list()

    async with sem:
        async with session.get(url) as response:
            content = await response.read()

        if response.status not in http_ok:
            print(f'Scraping {url} failed due to the return code {response.status}')
            return

        if content == b'':
            print(f'Scraping {url} failed due to the empty content')
            return

        with lzma_open(BytesIO(content)) as f:
            while True:
                chunk = f.read(chunk_size)
                if chunk:
                    data.append(unpack(fmt, chunk))
                else:
                    break

        async with AIOFile(f'{pair}-{year}-{month}-{day}-{hour}.bi5', 'w') as fl:
            await fl.write(dumps(data, indent=4))

        return


if __name__ == '__main__':
    run(download())

Исходный код доступен здесь

...