Как читать из нескольких разных источников одновременно? - PullRequest
0 голосов
/ 28 мая 2020

Я извлекаю данные из 3 источников данных: CSV_1, CSV_2 и DB 1. CSV_1 и CSV_2 имеют разные схемы и кодировки. Я использую pandas.read_csv для чтения CSV pandas.read_sql для извлечения из БД. Я новичок в параллелизме / параллелизме, но, насколько я понимаю, поскольку ввод-вывод является моим ограничением, многопоточность может помочь мне добиться увеличения скорости.

Я думал, что смогу использовать concurrent.futures.ThreadPoolExecutor и его метод карты для чтения csvs параллельно, например:

files = ['csv_1.csv', 'csv_2.csv']

with ThreadPoolExecutor(2) as executor:
    results = executor.map(pd.read_csv, files)

, но из того, что я видел, так как мне нужно применить различные параметры к read_csv (т.е. кодировку и dtypes), это не сработает. Есть ли способ использовать одну и ту же функцию с разными параметрами для каждой итерации? Только эта часть, асинхронное чтение двух разных csvs, требующих разных параметров pd.read_csv, будет огромной победой.

В идеале я бы также хотел добавить третий поток, который читает из БД.

Есть ли способ добиться этого?

Ответы [ 2 ]

2 голосов
/ 28 мая 2020

Есть несколько способов сделать это. Вам просто нужна временная функция, которая знает, как расширить единственный параметр, используемый map, в нужный вам вызов. В первом примере у вас может быть несколько консервированных типов. Это приятно, потому что легко увидеть параметры, которые вам нравятся для каждого типа

def csv_reader(params):
    filename, csv_type = *params
    if csv_type == 'footype':
        return pd.read_csv(filename, sep="|")
    elif csv_type == 'bartype':
        return pd.read_csv(filename, columns=["A", "B", "C"])

files = [('csv_1.csv', 'footype'), ('csv_2.csv', 'bartype')]

with ThreadPoolExecutor(2) as executor:
    results = executor.map(csv_reader, files)

Но вы всегда можете сделать его общим c

def csv_reader_generic(params):
    filename, args, kw = *params
    return pd.read_csv(filename, *args, **kwargs)

files = [('csv_1.csv', tuple(), {"sep":"|"}), 
    ('csv_2.csv', tuple(), {"columns":["A", "B", "C"]})]

with ThreadPoolExecutor(2) as executor:
    results = executor.map(csv_reader_generic, files)
0 голосов
/ 28 мая 2020
Подробный ответ

tdelaney охватывает ваш заданный вопрос, но, чтобы предложить другое предложение, отказ от CSV, поскольку ваш формат хранения может иметь еще большее значение, если у вас есть контроль над созданием файла.

Используя умеренно большие фреймы данных, которые я сгенерировал ниже, запись 2 CSV занимает 56,2 секунды, а чтение занимает 5,66 секунды последовательно или 3,06 с использованием потоков. Однако при переключении на Parquet запись занимает 2 секунды, а чтение - 317 мс, а размер файла сокращается менее чем вдвое.

In [282]: %time df1 = pd.DataFrame(np.random.random((1000000, 20)))
Wall time: 174 ms

In [283]: %time df2 = pd.DataFrame(np.random.random((1000000, 20)))
Wall time: 190 ms

In [284]: %time df1.to_csv("test1.csv", index=False)
Wall time: 28 s

In [285]: %time df2.to_csv("test2.csv", index=False)
Wall time: 28.2 s

In [286]: %time df1 = pd.read_csv("test1.csv"); df2 = pd.read_csv("test2.csv")
Wall time: 5.66 s

In [287]: files = ["test1.csv", "test2.csv"]

In [288]: %time with ThreadPoolExecutor(2) as executor: results = executor.map(pd.read_csv, files)
Wall time: 3.06 s

In [289]: %time df1.to_parquet('test1.parquet')
Wall time: 939 ms

In [290]: %time df2.to_parquet('test2.parquet')
Wall time: 917 ms

In [291]: %time df1 = pd.read_parquet("test1.parquet"); df2 = pd.read_parquet("test2.parquet")
Wall time: 317 ms

In [292]: !ls -lh test*
-rw-r--r-- 1 Randy 197609 360K Aug 30  2017 test.png
-rw-r--r-- 1 Randy 197609 369M May 27 21:53 test1.csv
-rw-r--r-- 1 Randy 197609 158M May 27 21:54 test1.parquet
-rw-r--r-- 1 Randy 197609 369M May 27 21:53 test2.csv
-rw-r--r-- 1 Randy 197609 158M May 27 21:55 test2.parquet
...