Asyncio в сопрограмме RuntimeError: нет цикла обработки событий - PullRequest
1 голос
/ 09 ноября 2019

Я пишу многопроцессорный код, который отлично работает в Python 3.7. Тем не менее, я хочу, чтобы один из параллельных процессов выполнял процессы ввода-вывода, когда-либо использующие AsyncIO, чтобы повысить производительность, но не смог запустить его.

Ubuntu 18.04, Python 3.7, AsyncIO, pipenv (все библиотеки pip установлены)

В частности, метод выполняется, как и ожидалось, с использованием многопоточности, и это то, что я хочу заменить на AsyncIO.

Я прогуглил и попытался зациклить в основном () и теперь только в намеченной подпрограмме, мы рассмотрели примеры и прочитали об этом новом асинхронном способе избавления от проблем и пока безрезультатно.

Ниже приведен код app.py, который подвергается преследованию: python app.py

import sys
import traceback
import logging
import asyncio

from config import DEBUG
from config import log_config
from <some-module> import <some-class>

if DEBUG:
    logging.config.dictConfig(log_config())
else:
    logging.basicConfig(
        level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
logger = logging.getLogger(__name__)


def main():
    try:
        <some> = <some-class>([
            'some-data1.csv',
            'some-data2.csv'
            ])
        <some>.run()

    except:

        traceback.print_exc()
        pdb.post_mortem()

    sys.exit(0)


if __name__ == '__main__':

    asyncio.run(main())

Вот код, в котором определен данный класс

    _sql_client = SQLServer()
    _blob_client = BlockBlobStore()
    _keys = KeyVault()
    _data_source = _keys.fetch('some-data')
    #  Multiprocessing
    _manager = mp.Manager()
    _ns = _manager.Namespace()

    def __init__(self, list_of_collateral_files: list) -> None:

    @timeit
    def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _get_hours(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _load_original_bids(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _merge_bids_with_hours(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _get_collaterial_per_month(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _calc_bid_per_path(self) -> None:

    @timeit
    def run(self) -> None:

Метод, содержащий асинхронный код, находится здесь:

    def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:

        all_files = self._blob_client.download_blobs(self._list_of_blob_files)

        _all_dfs = pd.DataFrame()
        async def read_task(file_: str) -> None:
            nonlocal _all_dfs
            df = pd.read_csv(StringIO(file_.content))
            _all_dfs = _all_dfs.append(df, sort=False)

        tasks = []
        loop = asyncio.new_event_loop()

        for file_ in all_files:
            tasks.append(asyncio.create_task(read_task(file_)))

        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()

        _all_dfs['TOU'] = _all_dfs['TOU'].map(lambda x: 'OFFPEAK' if x == 'OFF' else 'ONPEAK')
        ns.dfs = _all_dfs

И метод, который вызывает конкретную последовательность, и этот асинхронный метод:

    def run(self) -> None:
        extract = []
        extract.append(mp.Process(target=self._get_filter_collateral, args=(self._ns, )))
        extract.append(mp.Process(target=self._get_hours, args=(self._ns, )))
        extract.append(mp.Process(target=self._load_original_bids, args=(self._ns, )))

        #  Start the parallel processes
        for process in extract:
            process.start()

        #  Await for database process to end
        extract[1].join()
        extract[2].join()

        #  Merge both database results
        self._merge_bids_with_hours(self._ns)

        extract[0].join()

        self._get_collaterial_per_month(self._ns)
        self._calc_bid_per_path()
        self._save_reports()
        self._upload_data()

Вот ошибки, которые я получаю:

Process Process-2:
Traceback (most recent call last):
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 104, in _get_filter_collateral
    tasks.append(asyncio.create_task(read_task(file_)))
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/asyncio/tasks.py", line 350, in create_task
    loop = events.get_running_loop()
RuntimeError: no running event loop
<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: RuntimeWarning: coroutine '<some-class>._get_filter_collateral.<locals>.read_task' was never awaited
  traceback.print_exc()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
DEBUG Calculating monthly collateral...
Traceback (most recent call last):
  File "app.py", line 25, in main
    caiso.run()
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 425, in run
    self._get_collaterial_per_month(self._ns)
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 196, in _get_collaterial_per_month
    credit_margin = ns.dfs
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 1122, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 834, in _callmethod
    raise convert_to_error(kind, result)
AttributeError: 'Namespace' object has no attribute 'dfs'
> <some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py(834)_callmethod()
-> raise convert_to_error(kind, result)
(Pdb)

СпасибоальЯ на ваше время !!!

...