Параллельная загрузка входных файлов в Pandas Dataframe - PullRequest
0 голосов
/ 22 января 2019

У меня есть требование, в котором у меня есть три входных файла, и мне нужно загрузить их внутри фрейма данных Pandas, прежде чем объединить два файла в один фрейм данных.

Расширение файла всегда изменяется, это может быть .txt один раз и .xlsx или .csv в другой раз.

Как я могу запустить этот процесс параллельно, чтобы сэкономить время ожидания / загрузки?

Это мой код на данный момент,

from time import time # to measure the time taken to run the code
start_time = time()

Primary_File = "//ServerA/Testing Folder File Open/Report.xlsx"
Secondary_File_1 = "//ServerA/Testing Folder File Open/Report2.csv"
Secondary_File_2 = "//ServerA/Testing Folder File Open/Report2.csv"

import pandas as pd # to work with the data frames
Primary_df = pd.read_excel (Primary_File)
Secondary_1_df = pd.read_csv (Secondary_File_1)
Secondary_2_df = pd.read_csv (Secondary_File_2)

Secondary_df = Secondary_1_df.merge(Secondary_2_df, how='inner', on=['ID'])
end_time = time()

print(end_time - start_time)

Мне требуется около 20 минут, чтобы загрузить мои primary_df и second_df. Итак, я ищу эффективное решение, возможно, использующее параллельную обработку для экономии времени. Я рассчитал операцию чтения, и это занимает большую часть времени примерно 18 минут 45 секунд.

Аппаратная конфигурация: - Процессор Intel i5, 16 ГБ оперативной памяти и 64-разрядная ОС

Вопрос сделан Право на получение награды: - Как я ищу работу код с подробными инструкциями - используя пакет с анакондой окружение , которое поддерживает загрузку моих входных файлов Parallel и хранить их в фрейме данных панд отдельно. Это должно в конечном итоге сэкономить время.

Ответы [ 5 ]

0 голосов
/ 31 января 2019

Вы даете подробности об оборудовании, но не даете самое интересное: количество дисков, тип RAID и файловую систему, с которой вы читаете.

Если у вас есть только один диск, нет RAID и обычная файловая система (ext4, XFS и т. Д.), Как у вас, в основном, на ноутбуках, вы не сможете увеличить пропускную способность, просто выбрасывая процессоры (многопоточные или многопроцессорные). ) на проблему. Использование нескольких потоков или асинхронного ввода-вывода поможет немного замаскировать задержку, но не увеличит пропускную способность, так как есть вероятность, что вы уже насыщаете ее одним процессом чтения.

Итак, используя код, предложенный @ Cezary.Sz, попробуйте переместить один из файлов на внешнее запоминающее устройство USB3.0 или в хранилище SDSX. Если вы работаете на большой рабочей станции, посмотрите сведения об оборудовании, чтобы увидеть, доступно ли несколько дисков, а если вы работаете в большом кластере, найдите параллельную файловую систему (BeeGFS, Luster и т. Д.)

0 голосов
/ 30 января 2019

К сожалению, из-за GIL (Global Interpreter Lock) в Python, несколько потоков не работают одновременно - все потоки используют одно и то же ядро ​​одного процессора . Это означает, что если вы создадите несколько потоков для загрузки ваших файлов, общее время будет равно (или даже больше) времени, необходимому для загрузки этих файлов один за другим.

Подробнее о GIL: https://wiki.python.org/moin/GlobalInterpreterLock

Чтобы ускорить загрузку, вы можете попробовать переключиться с csv / excel на маринованные файлы (или HDF).

0 голосов
/ 29 января 2019

Попробуйте использовать код @ Cezary.Sz, но используйте (удалите вызовы на .get()) вместо:

Primary_df_job = pool.apply_async(import_xlsx, (Primary_File, ))
Secondary_1_df_job = pool.apply_async(import_csv, (Secondary_File_1, ))
Secondary_2_df_job = pool.apply_async(import_csv, (Secondary_File_2, ))

Затем

Secondary_1_df = Secondary_1_df_job.get()
Secondary_2_df = Secondary_2_df_job.get()

И вы можете использовать кадры данных,в то время как Primary_df_job загружается.

Secondary_df = Secondary_1_df.merge(Secondary_2_df, how='inner', on=['ID'])

Когда вам нужно Primary_df в вашем коде, используйте

Primary_df = Primary_df_job.get()

Это будет блокировать выполнение до завершения Primary_df_job.

0 голосов
/ 30 января 2019

Почему бы не использовать asyncio сверх multiprocessing?

Вместо использования нескольких потоков, вы можете сначала использовать уровень ввода / вывода с помощью Async CSV Dict Reader (который может быть распараллелен с использованием multiprocessing для нескольких файлов).После этого вы можете либо объединить слова, а затем загрузить эти словари в панды или загрузить отдельные слова в панды и выполнить конкататацию там.Однако pandas не поддерживает asyncio, поэтому в какой-то момент вы потеряете производительность.

0 голосов
/ 24 января 2019

Попробуйте это:

from time import time 
import pandas as pd
from multiprocessing.pool import ThreadPool


start_time = time()

pool = ThreadPool(processes=3)

Primary_File = "//ServerA/Testing Folder File Open/Report.xlsx"
Secondary_File_1 = "//ServerA/Testing Folder File Open/Report2.csv"
Secondary_File_2 = "//ServerA/Testing Folder File Open/Report2.csv"


# Define a function for the thread
def import_xlsx(file_name):
    df_xlsx = pd.read_excel(file_name)
    # print(df_xlsx.head())
    return df_xlsx


def import_csv(file_name):
    df_csv = pd.read_csv(file_name)
    # print(df_csv.head())
    return df_csv

# Create two threads as follows

Primary_df = pool.apply_async(import_xlsx, (Primary_File, )).get() 
Secondary_1_df = pool.apply_async(import_csv, (Secondary_File_1, )).get() 
Secondary_2_df = pool.apply_async(import_csv, (Secondary_File_2, )).get() 

Secondary_df = Secondary_1_df.merge(Secondary_2_df, how='inner', on=['ID'])
end_time = time()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...