Я пишу многопроцессорный код, который отлично работает в Python 3.7. Тем не менее, я хочу, чтобы один из параллельных процессов выполнял процессы ввода-вывода, когда-либо использующие AsyncIO, чтобы повысить производительность, но не смог запустить его.
Ubuntu 18.04, Python 3.7, AsyncIO, pipenv (все библиотеки pip установлены)
В частности, метод выполняется, как и ожидалось, с использованием многопоточности, и это то, что я хочу заменить на AsyncIO.
Я прогуглил и попытался зациклить в основном () и теперь только в намеченной подпрограмме, мы рассмотрели примеры и прочитали об этом новом асинхронном способе избавления от проблем и пока безрезультатно.
Ниже приведен код app.py, который подвергается преследованию: python app.py
import sys
import traceback
import logging
import asyncio
from config import DEBUG
from config import log_config
from <some-module> import <some-class>
if DEBUG:
logging.config.dictConfig(log_config())
else:
logging.basicConfig(
level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
logger = logging.getLogger(__name__)
def main():
try:
<some> = <some-class>([
'some-data1.csv',
'some-data2.csv'
])
<some>.run()
except:
traceback.print_exc()
pdb.post_mortem()
sys.exit(0)
if __name__ == '__main__':
asyncio.run(main())
Вот код, в котором определен данный класс
_sql_client = SQLServer()
_blob_client = BlockBlobStore()
_keys = KeyVault()
_data_source = _keys.fetch('some-data')
# Multiprocessing
_manager = mp.Manager()
_ns = _manager.Namespace()
def __init__(self, list_of_collateral_files: list) -> None:
@timeit
def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:
@timeit
def _get_hours(self, ns: mp.managers.NamespaceProxy) -> None:
@timeit
def _load_original_bids(self, ns: mp.managers.NamespaceProxy) -> None:
@timeit
def _merge_bids_with_hours(self, ns: mp.managers.NamespaceProxy) -> None:
@timeit
def _get_collaterial_per_month(self, ns: mp.managers.NamespaceProxy) -> None:
@timeit
def _calc_bid_per_path(self) -> None:
@timeit
def run(self) -> None:
Метод, содержащий асинхронный код, находится здесь:
def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:
all_files = self._blob_client.download_blobs(self._list_of_blob_files)
_all_dfs = pd.DataFrame()
async def read_task(file_: str) -> None:
nonlocal _all_dfs
df = pd.read_csv(StringIO(file_.content))
_all_dfs = _all_dfs.append(df, sort=False)
tasks = []
loop = asyncio.new_event_loop()
for file_ in all_files:
tasks.append(asyncio.create_task(read_task(file_)))
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
_all_dfs['TOU'] = _all_dfs['TOU'].map(lambda x: 'OFFPEAK' if x == 'OFF' else 'ONPEAK')
ns.dfs = _all_dfs
И метод, который вызывает конкретную последовательность, и этот асинхронный метод:
def run(self) -> None:
extract = []
extract.append(mp.Process(target=self._get_filter_collateral, args=(self._ns, )))
extract.append(mp.Process(target=self._get_hours, args=(self._ns, )))
extract.append(mp.Process(target=self._load_original_bids, args=(self._ns, )))
# Start the parallel processes
for process in extract:
process.start()
# Await for database process to end
extract[1].join()
extract[2].join()
# Merge both database results
self._merge_bids_with_hours(self._ns)
extract[0].join()
self._get_collaterial_per_month(self._ns)
self._calc_bid_per_path()
self._save_reports()
self._upload_data()
Вот ошибки, которые я получаю:
Process Process-2:
Traceback (most recent call last):
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
result = method(*args, **kwargs)
File "<some-path>/src/azure/application/caiso/main.py", line 104, in _get_filter_collateral
tasks.append(asyncio.create_task(read_task(file_)))
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/asyncio/tasks.py", line 350, in create_task
loop = events.get_running_loop()
RuntimeError: no running event loop
<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: RuntimeWarning: coroutine '<some-class>._get_filter_collateral.<locals>.read_task' was never awaited
traceback.print_exc()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
DEBUG Calculating monthly collateral...
Traceback (most recent call last):
File "app.py", line 25, in main
caiso.run()
File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
result = method(*args, **kwargs)
File "<some-path>/src/azure/application/caiso/main.py", line 425, in run
self._get_collaterial_per_month(self._ns)
File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
result = method(*args, **kwargs)
File "<some-path>/src/azure/application/caiso/main.py", line 196, in _get_collaterial_per_month
credit_margin = ns.dfs
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 1122, in __getattr__
return callmethod('__getattribute__', (key,))
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 834, in _callmethod
raise convert_to_error(kind, result)
AttributeError: 'Namespace' object has no attribute 'dfs'
> <some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py(834)_callmethod()
-> raise convert_to_error(kind, result)
(Pdb)
СпасибоальЯ на ваше время !!!