django и asyncio - асинхронно извлекают данные из удаленной конечной точки REST - PullRequest
0 голосов
/ 27 августа 2018

Я пытаюсь переписать команду управления django асинхронным способом, используя asyncio и aiohttp.Это следующие файлы:

# rest_async.py
async def t_search_coro(token, loop, **kwargs):
    """
    ws T Search Query:
    kwargs:
    - modification_start_date: (str) Format: YYYY-MM-DDTHH:MM:SS (e.g.: 2013-02-26T11:00:00)
    - modification_end_date: (str) Format: YYYY-MM-DDTHH:MM:SS (e.g.: 2013-02-26T11:00:00)
    - lo_type: (str) LO Type. Defaults to 'Event'
    - status: (str) T Status of the LO. Required
    - portal: portal. Default: settings.PORTAL
    - page_nr: PageNumber querystring parameter. Default: 1
    """
    path = '/services/api/TSearch'
    method = 'GET'
    modification_start_date = kwargs.pop('modification_start_date')
    modification_end_date = kwargs.pop('modification_end_date')
    lo_type = kwargs.pop('lo_type', 'Event')
    status = kwargs.pop('status')
    portal = kwargs.pop('portal', settings.PORTAL)
    page_nr = kwargs.pop('page_nr', 1)
    debugging = kwargs.pop('debugging', True)
    signature_kws = get_signature_kwargs(token, path, method)
    headers = signature_kws.get('headers')
    params = {
        'LOType': lo_type,
        'Status': status,
        'PageNumber': page_nr,
        'format': 'JSON'
    }
    if modification_start_date is not None:
        params['ModificationStartDate'] = modification_start_date
    if modification_end_date is not None:
        params['ModificationEndDate'] = modification_end_date

    service_end_point = 'https://{}.example.net{}'.format(portal, path)
    print("fetching data: {} - {}".format(modification_start_date, modification_end_date))
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(url=service_end_point, params=params, headers=headers) as resp:
            assert resp.status == 200
            return await resp.read()


# utils_async.py

async def fetch_t_data_coro(
        loop, lo_type='Session', modification_start_date=now()-timedelta(hours=22), modification_end_date=now(),
    status='Completed', **kwargs):
    date_fmt = "%Y-%m-%dT%H:%M:%S"
    if (modification_end_date - modification_start_date).total_seconds() > timedelta(days=1).total_seconds():
        raise Exception("modification start/end datetime interval must be within 24 hrs."
                        "\nmod. start date: {}\nmod. end date: {}".format(
            modification_start_date.strftime(date_fmt), modification_end_date.strftime(date_fmt)
        ))
    debugging = kwargs.pop('debugging', False)
    page_nr = kwargs.get('page_nr', 1)
    modification_start_date = modification_start_date.strftime(date_fmt)
    modification_end_date = modification_end_date.strftime(date_fmt)
    rtn_data = []
    params = {
        'LOType': lo_type, 'Status': status, 'PageNumber': page_nr, 'format': 'JSON'
    }
    already_added = set()
    while True:
        data = await rest_async.t_search_coro(
            token, loop, modification_start_date=modification_start_date, modification_end_date=modification_end_date,
            lo_type=lo_type, status=status, page_nr=page_nr, debugging=debugging
        )
        data_dict = json.loads(data.decode('utf-8'))
        if 'data' not in data_dict:
            break
        total_pages = data_dict['data'][0]['T_Item']['TotalPages']
        t_raw_data = data_dict['data'][0]['T_Item']['T']
        for item in t_raw_data:
            _h = hash(json.dumps(item, sort_keys=True))
            if _h in already_added:
                continue
            already_added.add(_h)
            rtn_data.append(item)
        if page_nr >= total_pages:
            break
        page_nr += 1
    return rtn_data


# load_data_async.py (actual django management command)

import asyncio
from datetime import timedelta, datetime
import argparse
import logging

from django.core.management.base import BaseCommand
from django.utils.timezone import now

from myapp.utils_async import fetch_transcript_data_coro

RUNNING_INTERVAL_MINS = 60
logger = logging.getLogger('my_proj')
MAX_BACKDAYS = 160
BACKDAYS_HOURS = {3, 9, 15, 21}
DEFAULT_TIMEFRAME=24
GO_BACK_DAYS = 30
GO_BACK_DAYS_TIMEFRAME = 24


class Command(BaseCommand):
    help = "fetch data asynchrounously"

    def add_arguments(self, parser):
        parser.add_argument(
            '--timeframe', action='store', dest='timeframe', default=DEFAULT_TIMEFRAME, type=int,
            help='Timeframe hours to be used (default to 24, range: 1 to 24)'
        )
        parser.add_argument(
            '--backdays', action='store', dest='backdays', default=None, type=int,
            help='repeat the command execution (for the same timeframe) n days before the current day'
        )

        parser.add_argument('--start-date', type=valid_date_type)
        parser.add_argument('--end-date', type=valid_date_type)

    def handle(self, *args, **options):
        self.loop = asyncio.get_event_loop()
        self.loop.run_until_complete(self._handle(*args, **options))

    async def _handle(self, *args, **options):
        timeframe = options.get('timeframe')
        backdays = options.get('backdays', None)
        start_date = options.get('start_date')
        end_date = options.get('end_date')
        backdays = backdays + 1 if backdays is not None else 1
        if all([start_date is not None, end_date is not None]):
            days_range = [start_date + timedelta(days=x) for x in range((end_date - start_date).days + 1)]
        else:
            days_range = [now() - timedelta(days=x) for x in range(backdays)]
        for mod_end_datetime in days_range:
            mod_start_datetime = mod_end_datetime - timedelta(minutes=RUNNING_INTERVAL_MINS * timeframe)
            data = await fetch_t_data_coro(
                loop=self.loop, modification_start_date=mod_start_datetime, modification_end_date=mod_end_datetime
            )

def valid_date_type(arg_date_str):
    try:
        return datetime.strptime(arg_date_str, "%Y-%m-%d")
    except ValueError:
        msg = "Given Date ({0}) not valid! Expected format, YYYY-MM-DD!".format(arg_date_str)
        raise argparse.ArgumentTypeError(msg)

Затем я попытался запустить cmd как:

python manage.py load_data_async --start-date 2018-04-20 --end-date 2018-06-6

, команда выполняется без ошибок, однако из оператора print видно, чтосопрограммы выполняются последовательно, аналогично исходному синхронному коду:

# output
fetching data: 2018-04-19T00:00:00 - 2018-04-20T00:00:00
fetching data: 2018-04-19T00:00:00 - 2018-04-20T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-21T00:00:00 - 2018-04-22T00:00:00
fetching data: 2018-04-21T00:00:00 - 2018-04-22T00:00:00
fetching data: 2018-04-21T00:00:00 - 2018-04-22T00:00:00
fetching data: 2018-04-22T00:00:00 - 2018-04-23T00:00:00
fetching data: 2018-04-23T00:00:00 - 2018-04-24T00:00:00
fetching data: 2018-04-24T00:00:00 - 2018-04-25T00:00:00
fetching data: 2018-04-24T00:00:00 - 2018-04-25T00:00:00
fetching data: 2018-04-25T00:00:00 - 2018-04-26T00:00:00
fetching data: 2018-04-25T00:00:00 - 2018-04-26T00:00:00
fetching data: 2018-04-25T00:00:00 - 2018-04-26T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00

...
...
fetching data: 2018-05-22T00:00:00 - 2018-05-23T00:00:00
fetching data: 2018-05-22T00:00:00 - 2018-05-23T00:00:00
fetching data: 2018-05-23T00:00:00 - 2018-05-24T00:00:00
fetching data: 2018-05-23T00:00:00 - 2018-05-24T00:00:00
fetching data: 2018-05-24T00:00:00 - 2018-05-25T00:00:00
fetching data: 2018-05-25T00:00:00 - 2018-05-26T00:00:00
fetching data: 2018-05-25T00:00:00 - 2018-05-26T00:00:00
fetching data: 2018-05-25T00:00:00 - 2018-05-26T00:00:00
fetching data: 2018-05-25T00:00:00 - 2018-05-26T00:00:00
fetching data: 2018-05-26T00:00:00 - 2018-05-27T00:00:00
fetching data: 2018-05-27T00:00:00 - 2018-05-28T00:00:00
fetching data: 2018-05-28T00:00:00 - 2018-05-29T00:00:00
fetching data: 2018-05-29T00:00:00 - 2018-05-30T00:00:00
fetching data: 2018-05-30T00:00:00 - 2018-05-31T00:00:00
fetching data: 2018-05-30T00:00:00 - 2018-05-31T00:00:00
fetching data: 2018-05-30T00:00:00 - 2018-05-31T00:00:00
fetching data: 2018-05-31T00:00:00 - 2018-06-01T00:00:00
fetching data: 2018-05-31T00:00:00 - 2018-06-01T00:00:00
fetching data: 2018-06-01T00:00:00 - 2018-06-02T00:00:00
fetching data: 2018-06-01T00:00:00 - 2018-06-02T00:00:00
fetching data: 2018-06-01T00:00:00 - 2018-06-02T00:00:00
fetching data: 2018-06-01T00:00:00 - 2018-06-02T00:00:00
fetching data: 2018-06-02T00:00:00 - 2018-06-03T00:00:00
fetching data: 2018-06-02T00:00:00 - 2018-06-03T00:00:00
fetching data: 2018-06-02T00:00:00 - 2018-06-03T00:00:00
fetching data: 2018-06-03T00:00:00 - 2018-06-04T00:00:00
fetching data: 2018-06-03T00:00:00 - 2018-06-04T00:00:00
fetching data: 2018-06-04T00:00:00 - 2018-06-05T00:00:00
fetching data: 2018-06-04T00:00:00 - 2018-06-05T00:00:00
fetching data: 2018-06-05T00:00:00 - 2018-06-06T00:00:00
fetching data: 2018-06-05T00:00:00 - 2018-06-06T00:00:00
fetching data: 2018-06-05T00:00:00 - 2018-06-06T00:00:00
fetching data: 2018-06-05T00:00:00 - 2018-06-06T00:00:00

Кто-нибудь заметил что-то не так?или это правильное поведение?У меня нет опыта работы с asyncio, но я ожидал не последовательного выполнения ...

версия python: 3.6.3

1 Ответ

0 голосов
/ 27 августа 2018

Кажется, что код await fetch_t_data_coro вызывает один за другим, что заставляет их запускаться последовательно.

Чтобы запустить их параллельно, вы можете использовать asyncio.gather:

        coros = []
        for mod_end_datetime in days_range:
            mod_start_datetime = mod_end_datetime - timedelta(minutes=RUNNING_INTERVAL_MINS * timeframe)
            coros.append(fetch_t_data_coro(
                loop=self.loop, modification_start_date=mod_start_datetime, modification_end_date=mod_end_datetime
            ))
        data_list = await asyncio.gather(*coros)

Два несвязанных примечания:

  • Код создает aiohttp.ClientSession в каждом t_search_coro.Это анти-шаблон - вы должны создать один ClientSession на верхнем уровне и передать его отдельным сопрограммам (даже работающим параллельно), чтобы они все использовали один и тот же экземпляр сеанса.
  • Начиная с Python 3.5.3 , asyncio.get_event_loop() будет правильно обрабатывать цикл запуска событий при вызове из сопрограммы.В результате вам не нужно отправлять объект цикла по вызовам сопрограмм, просто вызовите get_event_loop, когда вам это нужно (чего в вашем коде нет, так как ClientSession также правильно выводит цикл событий на егособственный).
...