Я видел несколько других постов о том, что pandas не поддерживает asyncio, и, возможно, я просто упускаю большую картину, но это не должно иметь значения, если я выполняю операции на месте, верно?
Операции на месте - это те, которые изменяют существующие данные .Это вопрос эффективности, тогда как ваша цель - распараллеливание, совсем другое дело.
Pandas не поддерживает asyncio не только потому, что это еще не реализовано, но и потому, что Pandas обычно не поддерживаетВид операций, которые хорошо поддерживает Asyncio: IO сети и подпроцесса.Функции Pandas либо используют ЦП, либо ждут доступа к диску, но ни одна из них не подходит для asyncio.Asyncio позволяет выразить сетевое взаимодействие с сопрограммами, которые выглядят как обычный синхронный код.Внутри сопрограммы каждая операция блокировки (например, чтение по сети) имеет значение await
, что автоматически приостанавливает всю задачу, если данные еще не доступны.При каждом таком приостановлении система переключается на следующую задачу, эффективно создавая совместную многозадачную систему.
При попытке вызвать библиотеку, не поддерживающую асинхронность, такую как панды, на первый взгляд все будет работать, но вы не получите никакой выгоды, и код будет работать последовательно.Например:
async def loop_dfs(dfs):
async def clean_df(df):
...
tasks = [clean_df(df) for (table, df) in dfs.items()]
completed, pending = await asyncio.wait(tasks)
Поскольку clean_df
не содержит ни одного экземпляра await
, это сопрограмма только по имени - она фактически никогда не приостанавливает свое выполнение, чтобы позволить другим сопрограммам выполняться.Таким образом, await asyncio.wait(tasks)
будет запускать задачи последовательно, как если бы вы написали:
for table, df in dfs.items():
clean_df(df)
Чтобы заставить вещи работать параллельно (при условии, что pandas иногда выпускает GIL во время своих операций),Вы должны передать отдельные функции, связанные с ЦП, в пул потоков:
async def loop_dfs(dfs):
def clean_df(df): # note: ordinary def
...
loop = asyncio.get_event_loop(0
tasks = [loop.run_in_executor(clean_df, df)
for (table, df) in dfs.items()]
completed, pending = await asyncio.wait(tasks)
Если вы идете по этому маршруту, вам не понадобится asyncio, вы можете просто использовать concurrent.futures
.Например:
def loop_dfs(dfs): # note: ordinary def
def clean_df(df):
...
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(clean_df, df)
for (table, df) in dfs.items()]
concurrent.futures.wait(futures)
решил, что сначала попробую, но продолжаю получать RuntimeError: Event loop already running
Эта ошибка обычно означает, что вы запустили скрипт всреда, которая уже работает для вас asyncio, такая как блокнот jupyter.Если это так, убедитесь, что вы запустили свой скрипт с запасом python
, или обратитесь к документации вашего ноутбука, как изменить код для отправки сопрограмм в цикл событий, который уже выполняется.