Асинхронное постоянство в Python - PullRequest
0 голосов
/ 03 мая 2019

Мне нужно ускорить выполнение скрипта Python, который читает большой CSV-файл кусками, выполняет некоторую обработку, а затем сохраняет обработанные строки в базе данных. Требуется сравнимое время (1,5 с) для обработки 10000 строк, а затем для их сохранения. Времена действительно имеют некоторые колебания, конечно, иногда обработка происходит быстрее, иногда сохраняется.

К сожалению, обработка записей не может быть легко распараллелена, потому что обработка является исторической (записи являются сделками с акциями, и есть расчеты, основанные на предыдущей операции). Возможно, но для этого вопроса можно сделать что-то, чтобы распараллелить обработку фрагмента с сохранением результатов предыдущего фрагмента. Это должно вдвое сократить общее время.

for chunk in pd.read_csv(filename, chunksize=chunksize):
    # the following two tasks in parallel
    persist (rows_from_previous_chunk) # this is I/O waiting, mostly
    rows_to_save = process(chunk)      # this is Python, not C
    # wait for the above to finish
    rows_from_previous_chunk = rows_to_save

Мой вопрос о том, какие рекомендуемые способы сделать выше. Я могу думать о нескольких:

  1. Учитывая, что одной задачей является в основном ожидание ввода-вывода, есть вероятность, что я могу использовать многопоточность, не сталкиваясь с конфликтом GIL.

  2. Второй вариант - использовать Dask, а именно Delayed . Однако, учитывая короткое время, используемое каждой задачей (до 2 секунд), я не уверен, что это лучший подход.

  3. Третий вариант состоит в том, чтобы один процесс считывал и обрабатывал строки, а затем отправлял их через ограниченную очередь в отдельную, которая будет сохранять данные в БД. Использовать очередь JMS излишне, я думаю о multiprocessing.Queue()

Любой совет приветствуется. Я давний программист на Java, который недавно переключился на Python и научился жить с GIL, поэтому возникает вопрос.

Ответы [ 3 ]

1 голос
/ 03 мая 2019

Dask добавляет накладных расходов, но очень мало по сравнению с обычным временем выполнения задачи 2s.Для поддержания порядка каждая задача может зависеть от предыдущей.Вот удар в это

@dask.delayed
def process_save(rows_from_previous_chunk, chunk):
    if rows_from_previous_chunk:
        persist(rows_from_previous_chunk)
    return process(chunk)

parts = dd.read_csv(filename, chunksize=chunksize).to_delayed()

prev = None
for chunk in parts:
    prev = process_save(prev, chunk)
out = dask.delayed(persist)(prev)
dask.compute(out)

out.visualize()  # should look interesting
0 голосов
/ 08 мая 2019

Я остановился на следующем подходе. Интересно, что использование многопоточности не сработало, как ожидалось; передача кадра данных в другую очередь для сохранения по-прежнему блокировала основной поток от продолжения работы. Не уверен на 100%, что происходит, но в интересах времени я переключился на использование процессов, и это работает. Код немного упрощен для ясности ниже, в действительности я использовал несколько рабочих процессов БД.

import multiprocessing

# this function will run into a separate process, saving the df asynchronously
def save(queue):
    db_engine = create_engine(...)
    while True:
        df  = queue.get()
        if df is None:
            break
        df.to_sql(schema="...", name="...", con=db_engine, if_exists="append", chunksize=1000, index=False)
        queue.task_done()

if __name__ == '__main__':

    queue = multiprocessing.JoinableQueue(maxsize=2) 
    worker = multiprocessing.Process(name="db_worker", target=save, args=(queue,))
    worker.daemon = True
    workers.start()

    # inside the main loop to process the df
        queue.put(df_to_save)

    # at the end 
    worker.join()  # wait for the last save job to finish before terminating the main process
0 голосов
/ 03 мая 2019

Это может зависеть от вашей базы данных, но если она существует, самым простым способом может быть использование асинхронной библиотеки, такой как aiomysql или asyncpg , чтобы вы могли выполнять запросы вставки в фоновом режиме ,

Часть, связанная с вводом / выводом, может выполняться без блокировки GIL, поэтому ваша часть кода на Python сможет продолжаться.

...