Asyncio Pandas с на месте - PullRequest
       7

Asyncio Pandas с на месте

0 голосов
/ 15 сентября 2018

Я просто прочитал это введение , но у меня возникли проблемы с реализацией любого из примеров (второй пример - прокомментированный код):

import asyncio
import pandas as pd
from openpyxl import load_workbook

async def loop_dfs(dfs):
    async def clean_df(df):
        df.drop(["column_1"], axis=1, inplace=True)
        ... a bunch of other inplace=True functions ...
        return "Done"

    # tasks = [clean_df(df) for (table, dfs) in dfs.items()]
    # await asyncio.gather(*tasks)

    tasks = [clean_df(df) for (table, df) in dfs.items()]
    completed, pending = await asyncio.wait(tasks)


def main():
    dfs = {
        sn: pd.read_excel("excel.xlsx", sheet_name=sn)
        for sn in load_workbook("excel.xlsx").sheetnames
    }

    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(loop_dfs(dfs))

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(loop_dfs(dfs))
    finally:
        loop.close()

main()

Я видел несколько других постов о том, какPandas не поддерживает Asyncio, и, возможно, я просто упускаю более широкую картину, но это не должно иметь значения, если я правильно выполняю операции на месте? Я видел рекомендации для Dask , но без немедленной поддержки чтения Excel, подумал, что сначала попробую, но продолжаю получать

RuntimeError: Event loop already running

1 Ответ

0 голосов
/ 15 сентября 2018

Я видел несколько других постов о том, что pandas не поддерживает asyncio, и, возможно, я просто упускаю большую картину, но это не должно иметь значения, если я выполняю операции на месте, верно?

Операции на месте - это те, которые изменяют существующие данные .Это вопрос эффективности, тогда как ваша цель - распараллеливание, совсем другое дело.

Pandas не поддерживает asyncio не только потому, что это еще не реализовано, но и потому, что Pandas обычно не поддерживаетВид операций, которые хорошо поддерживает Asyncio: IO сети и подпроцесса.Функции Pandas либо используют ЦП, либо ждут доступа к диску, но ни одна из них не подходит для asyncio.Asyncio позволяет выразить сетевое взаимодействие с сопрограммами, которые выглядят как обычный синхронный код.Внутри сопрограммы каждая операция блокировки (например, чтение по сети) имеет значение await, что автоматически приостанавливает всю задачу, если данные еще не доступны.При каждом таком приостановлении система переключается на следующую задачу, эффективно создавая совместную многозадачную систему.

При попытке вызвать библиотеку, не поддерживающую асинхронность, такую ​​как панды, на первый взгляд все будет работать, но вы не получите никакой выгоды, и код будет работать последовательно.Например:

async def loop_dfs(dfs):
    async def clean_df(df):
        ...    
    tasks = [clean_df(df) for (table, df) in dfs.items()]
    completed, pending = await asyncio.wait(tasks)

Поскольку clean_df не содержит ни одного экземпляра await, это сопрограмма только по имени - она ​​фактически никогда не приостанавливает свое выполнение, чтобы позволить другим сопрограммам выполняться.Таким образом, await asyncio.wait(tasks) будет запускать задачи последовательно, как если бы вы написали:

for table, df in dfs.items():
    clean_df(df)

Чтобы заставить вещи работать параллельно (при условии, что pandas иногда выпускает GIL во время своих операций),Вы должны передать отдельные функции, связанные с ЦП, в пул потоков:

async def loop_dfs(dfs):
    def clean_df(df):  # note: ordinary def
        ...
    loop = asyncio.get_event_loop(0
    tasks = [loop.run_in_executor(clean_df, df)
             for (table, df) in dfs.items()]
    completed, pending = await asyncio.wait(tasks)

Если вы идете по этому маршруту, вам не понадобится asyncio, вы можете просто использовать concurrent.futures.Например:

def loop_dfs(dfs):  # note: ordinary def
    def clean_df(df):
        ...
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(clean_df, df)
                   for (table, df) in dfs.items()]
        concurrent.futures.wait(futures)

решил, что сначала попробую, но продолжаю получать RuntimeError: Event loop already running

Эта ошибка обычно означает, что вы запустили скрипт всреда, которая уже работает для вас asyncio, такая как блокнот jupyter.Если это так, убедитесь, что вы запустили свой скрипт с запасом python, или обратитесь к документации вашего ноутбука, как изменить код для отправки сопрограмм в цикл событий, который уже выполняется.

...