Python Мультипроцессинг большого фрейма данных в Linux - PullRequest
0 голосов
/ 14 октября 2019

Как показано в заголовке, у меня есть большой фрейм данных (df), который нужно обрабатывать построчно, так как df большой (6 ГБ), я хочу использовать пакет multiprocessingPython, чтобы ускорить его, ниже приведен игрушечный пример, учитывая мое умение писать и сложность задания, я кратко опишу, чего хочу достичь, и подробно расскажу о коде.

Исходные данныеdf, из которого я хочу выполнить некоторый построчный анализ (порядок не имеет значения), который требует не только самого фокального ряда, но и других рядов, которые удовлетворяют определенным условиям. Ниже приведены данные игрушки и мой код,

import pandas as pd
import numpy as np
import itertools
from multiprocessing import Pool
import time
import math

# a test example
start_time = time.time()
df = pd.DataFrame({'value': np.random.randint(0, 10, size=30),
                   'district': (['upper'] * 5 + ['down'] * 5) * 3,
                   'region': ['A'] * 10 + ['B'] * 10 + ['C'] * 10})

df['row_id'] = df.index

print(df)

    value district region  row_id
0       8    upper      A       0
1       4    upper      A       1
2       0    upper      A       2
3       3    upper      A       3
4       0    upper      A       4
5       0     down      A       5
6       3     down      A       6
7       7     down      A       7
8       1     down      A       8
9       7     down      A       9
10      7    upper      B      10
11      3    upper      B      11
12      9    upper      B      12
13      8    upper      B      13
14      2    upper      B      14
15      4     down      B      15
16      5     down      B      16
17      3     down      B      17
18      5     down      B      18
19      3     down      B      19
20      3    upper      C      20
21      1    upper      C      21
22      3    upper      C      22
23      0    upper      C      23
24      3    upper      C      24
25      2     down      C      25
26      0     down      C      26
27      1     down      C      27
28      1     down      C      28
29      0     down      C      29

. Я хочу добавить еще два столбца count_b и count_a, которые просто подсчитывают количество строк, попадающих в диапазон (значение - 2, значение) и (значение, значение + 2) в одном и том же подмножестве region и district, например, count_b для строки row_id==0 должно быть 0, поскольку в пределах region=='A' и * нет строк1017 * имеет значение 7, попадающее в (8-2, 8). Таким образом, желаемый результат должен быть:

   count_a count_b region row_id
0        0       0      A      0
1        0       1      A      1
2        0       0      A      2
3        1       0      A      3
4        0       0      A      4
5        1       0      A      5
6        0       0      A      6
7        0       0      A      7
8        0       1      A      8
9        0       0      A      9
10       1       0      B     10
11       0       1      B     11
12       0       1      B     12
13       1       1      B     13
14       1       0      B     14
15       2       2      B     15
16       0       1      B     16
17       1       0      B     17
18       0       1      B     18
19       1       0      B     19
20       0       0      C     20
21       0       1      C     21
22       0       0      C     22
23       1       0      C     23
24       0       0      C     24
25       0       2      C     25
26       2       0      C     26
27       1       2      C     27
28       1       2      C     28
29       2       0      C     29

вопрос 1: можно ли векторизовать такую ​​задачу?

вопрос 2: как мы можем использовать multiprocessing, чтобы ускорить его ( решено )?

Я решил пойти с multiprocessing, потому что яне уверен, как это сделать с помощью векторизации. Решение (на основе ответа, предоставленного)

многопроцессорная

def b_a(input_df,r_d):
    print('length of input dataframe: ' + str(len(input_df)))
    # print('region: ' + str(r_d[0]), 'district: ' + str(r_d[1]))
    sub_df = input_df.loc[(input_df['region'].isin([r_d[0]])) & (input_df['district'].isin([r_d[1]]))]

    print('length of sliced dataframe: ' + str(len(sub_df)))

    print(r_d[0],r_d[1])


    b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])

    for id in sub_df['row_id']:
        print('processing row: ' + str(id))
        focal_value = sub_df.loc[sub_df['row_id'].isin([id])]['value']
        temp_b = sub_df.loc[
            (sub_df['value'] > (focal_value - 2).values[0]) & (sub_df['value'] < (focal_value.values[0]))]
        temp_a = sub_df.loc[
            (sub_df['value'] > (focal_value.values[0])) & (sub_df['value'] < (focal_value + 2).values[0])]

        if len(temp_a):
            temp_a['count_a'] = temp_a['row_id'].count()
        else:
            temp_a = temp_a.append(pd.Series(), ignore_index=True)
            temp_a = temp_a.reindex(
                columns=[*temp_a.columns.tolist(), 'count_a'], fill_value=0)
            print(temp_a)

        if len(temp_b):
            temp_b['count_b'] = temp_b['row_id'].count()
        else:
            temp_b = temp_b.append(pd.Series(), ignore_index=True)
            temp_b = temp_b.reindex(
                columns=[*temp_b.columns.tolist(), 'count_b'], fill_value=0)
        print(len(temp_a),len(temp_b))

        temp_b.drop_duplicates('count_b', inplace=True)
        temp_a.drop_duplicates('count_a', inplace=True)
        temp = pd.concat([temp_b[['count_b']].reset_index(drop=True),
                          temp_a[['count_a']].reset_index(drop=True)], axis=1)

        temp['row_id'] = id
        temp['region'] = str(r_d[0])

        b_a = pd.concat([b_a, temp])

    return b_a

r_d_list = list(itertools.product(df['region'].unique(),df['district'].unique()))


if __name__ == '__main__':
    P = Pool(3)
    out = P.starmap(b_a, zip([chunks[r_d_list.index(j)] for j in r_d_list for i in range(len(j))],
                             list(itertools.chain.from_iterable(r_d_list)))) # S3

    # out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list)) # S2
    # out = P.starmap(b_a,zip(df,r_d_list)) # S1

    # print(out)
    P.close()
    P.join()
    final = pd.concat(out, ignore_index=True)
    print(final)

    final.to_csv('final.csv',index=False)
print("--- %s seconds ---" % (time.time() - start_time))

Поскольку для использования P.starmap (а также P.map) требуется один, чтобы подпитать функцию всеми возможными пары аргумента для b_a, решение S1 не будет работать, так как zip(df,r_d_list) фактически производит zip между именами столбцов df и элементами в r_d_list, что затем приведет к ошибке AttributeError: 'str' object has no attribute 'loc', потому что input_df для функции b_a является буквально строкой (имя столбца df), что можно проверить, посмотрев на вывод print('length of input dataframe: ' + str(len(input_df))), который даст длину имен столбцов input_df (вэто дело df). Принятый ответ исправляет это путем создания ссылочного массива (S2) (не уверен, что именно это), который имеет ту же длину, что и список параметров (r_d_list). Это решение прекрасно работает, но может быть медленным, когда df велико, поскольку, по моему личному пониманию, оно требует поиска по всему фрейму данных для каждой пары параметров (region и distrcit), поэтому я придумал измененныйверсия, которая разбивает данные на порции на основе region и distrcit, а затем выполняет поиск в каждом порции вместо всего фрейма данных (S3). Для меня это решение повышает производительность на 20 процентов с точки зрения времени выполнения, см. Ниже код:

region = df['region'].unique()

chunk_numbers = 3

chunk_region = math.ceil(len(region) / chunk_numbers)

chunks = list()

r_d_list = list()

row_count = 0

for i in range(chunk_numbers):

    print(i)

    if i < chunk_numbers-1:
        regions = region[(i*chunk_region):((i+1)*chunk_region)]
        temp = df.loc[df['region'].isin(regions.tolist())]
        chunks.append(temp)
        r_d_list.append(list(itertools.product(regions,temp['district'].unique())))

        del temp

    else:
        regions = region[(i * chunk_region):len(region)]
        temp = df.loc[df['region'].isin(regions.tolist())]
        chunks.append(temp)
        r_d_list.append(list(itertools.product(regions,temp['district'].unique())))

        del temp

    row_count = row_count + len(chunks[i])
    print(row_count)

добавьте это значение между print(df) и def b_a() и не забудьте закомментировать r_d_list = ... до if __name__ == '__main__'.

Спасибо за это замечательное сообщество, теперь у меня есть работоспособное решение, я обновил свой вопрос, чтобы предоставить некоторые материалы для тех, кто может столкнуться с той же проблемой в будущем, а также чтобы лучше сформулировать вопрос, чтобы получить дажелучшие решения.

Ответы [ 3 ]

0 голосов
/ 14 октября 2019

Изменить

out = P.starmap(b_a,zip(df,r_d_list))

на

out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list))

Вывод выглядит следующим образом:

length of input dataframe: 300
region: B district: down
length of input dataframe: 300
region: C district: upper
length of sliced dataframe: 50
length of input dataframe: 300
region: C district: down
length of sliced dataframe: 50
length of sliced dataframe: 50
6
[  count_a count_b region row_id
0       6       7      A      0,   count_a count_b region row_id
0       2       4      A     50,   count_a count_b region row_id
0       1       4      B    100,   count_a count_b region row_id
0       7       4      B    150,   count_a count_b region row_id
0       4       9      C    200,   count_a count_b region row_id
0       4       4      C    250]

Массив df содержит ссылки :

dfa = [df for i in range(len(r_d_list))]

for i in dfa:
    print(['id(i): ', id(i)])

Вывод вышеприведенного выглядит следующим образом:

['id(i): ', 4427699200]
['id(i): ', 4427699200]
['id(i): ', 4427699200]
['id(i): ', 4427699200]
['id(i): ', 4427699200]
['id(i): ', 4427699200]

Разница между zip(df, r_d_list) и zip(dfa, r_d_list):

Просмотрите example на zip в https://docs.python.org/3.3/library/functions.html#zip, чтобы понять, что делает zip и как он создает результат.

list(zip(df, r_d_list)) возвращает следующее:

[
('value', ('A', 'upper')),
('district', ('A', 'down')),
('region', ('B', 'upper')),
('row_id', ('B', 'down'))
]

list(zip(dfa, r_d_list)) возвращает следующее:

[
(fa, ('A', 'upper')),
(fa, ('A', 'down')),
(fa, ('B', 'upper')),
(fa, ('B', 'down'))
]

Здесь вы можете найти пример на pool.starmap at Python multiprocessing pool.map для нескольких аргументов .

Обновлен рабочий код :

import pandas as pd
import numpy as np
import itertools
from multiprocessing import Pool

df = pd.DataFrame({'value': np.random.randint(0, 10, size=300),
                   'district': (['upper'] * 50 + ['down'] * 50) * 3,
                   'region': ['A'] * 100 + ['B'] * 100 + ['C'] * 100})

df['row_id'] = df.index

# b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])


# solution 2: multi processing
def b_a(input_df, r_d):
#    print('length of input dataframe: ' + str(len(input_df)))
#    print('region: ' + str(r_d[0]), 'district: ' + str(r_d[1]))

    sub_df = input_df.loc[(input_df['region'].isin([r_d[0]])) & (input_df['district'].isin([r_d[1]]))]  # subset data that in certain region and district

#    print('length of sliced dataframe: ' + str(len(sub_df)))

    b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])  # an empty data frame to store result

    for id in sub_df['row_id']:
        focal_value = sub_df.loc[sub_df['row_id'].isin([id])]['value']
        temp_b = sub_df.loc[
            (sub_df['value'] > (focal_value - 2).values[0]) & (sub_df['value'] < (focal_value.values[0]))]
        temp_a = sub_df.loc[
            (sub_df['value'] > (focal_value.values[0])) & (sub_df['value'] < (focal_value + 2).values[0])]

        if len(temp_a):
            temp_a['count_a'] = temp_a['row_id'].count()
        else:
            temp_a = temp_a.reindex(
                columns=[*temp_a.columns.tolist(), 'count_a'], fill_value=0)

        if len(temp_b):
            temp_b['count_b'] = temp_b['row_id'].count()
        else:
            temp_b = temp_b.reindex(
                columns=[*temp_b.columns.tolist(), 'count_b'], fill_value=0)

        temp_b.drop_duplicates('count_b', inplace=True)
        temp_a.drop_duplicates('count_a', inplace=True)
        temp = pd.concat([temp_b[['count_b']].reset_index(drop=True),
                          temp_a[['count_a']].reset_index(drop=True)], axis=1)

        temp['row_id'] = id
        temp['region'] = str(r_d[0])

        b_a = pd.concat([b_a, temp])

    return b_a


r_d_list = list(itertools.product(df['region'].unique(), df['district'].unique()))

# dfa = [df for i in range(len(r_d_list))]

#for i in dfa:
#    print(['id(i): ', id(i)])

if __name__ == '__main__':
    P = Pool(3)
    out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list))
    # print(len(out))
    P.close()
    P.join()

    final = pd.concat(out, ignore_index=True)

    print(final)

Вывод для final:

    count_a count_b region row_id
0         4       6      A      0
1         5       4      A      1
2       NaN       5      A      2
3         5       8      A      3
4         5     NaN      A      4
..      ...     ...    ...    ...
295       2       7      C    295
296       6     NaN      C    296
297       6       6      C    297
298       5       5      C    298
299       6       6      C    299

[300 rows x 4 columns]
0 голосов
/ 15 октября 2019

Я думаю, здесь есть место для улучшений. Я предлагаю вам определить функцию в groupby

import os
import pandas as pd
import numpy as np
import dask.dataframe as dd
N = 30_000
# Now the example is reproducible
np.random.seed(0)
df = pd.DataFrame({'value': np.random.randint(0, 10, size=N),
                   'district': (['upper'] * 5 + ['down'] * 5) * 3000,
                   'region': ['A'] * 10_000 + ['B'] * 10_000 + ['C'] * 10_000,
                   'row_id': np.arange(N)})

Следующая функция возвращает count_a и count_b для каждой строки в данной группе

def fun(vec):
    out = []
    for i, v in enumerate(vec):
        a = vec[:i] + vec[i+1:]
        count_a = np.isin(a, [v-2, 2]).sum()
        count_b = np.isin(a, [v, v+2]).sum()
        out.append([count_a, count_b])
    return out

Pandas

%%time
df[["count_a", "count_b"]] = df.groupby(["district", "region"])["value"]\
                               .apply(lambda x: fun(x))\
                               .explode().apply(pd.Series)\
                               .reset_index(drop=True)
CPU times: user 22.6 s, sys: 174 ms, total: 22.8 s
Wall time: 22.8 s

Dask

Теперь вам нужно создать заново df, а затем вы можете использовать dask. Вот первое, что пришло мне в голову. Наверняка есть лучший / более быстрый способ.

ddf = dd.from_pandas(df, npartitions=os.cpu_count())

df[["count_a", "count_b"]] = ddf.groupby(["district", "region"])["value"]\
                                .apply(lambda x: fun(x.tolist()),
                                       meta=('x', 'f8'))\
                                .compute(scheduler='processes')\
                                .explode().apply(pd.Series)\
                                .reset_index(drop=True)
CPU times: user 6.92 s, sys: 114 ms, total: 7.04 s
Wall time: 13.4 s

Многопроцессорная обработка

В этом случае снова необходимо создать df. И здесь уловка состоит в том, чтобы разбить df на список lst из df с.

import multiprocessing as mp
def parallelize(fun, vec, cores):
    with mp.Pool(cores) as p:
        res = p.map(fun, vec)
    return res

def par_fun(d):
    d = d.reset_index(drop=True)
    o = pd.DataFrame(fun(d["value"].tolist()),
                     columns=["count_a", "count_b"])
    return pd.concat([d,o], axis=1)
%%time
lst = [l[1] for l in list(df.groupby(["district", "region"]))]

out = parallelize(par_fun, lst, os.cpu_count())
out = pd.concat(out, ignore_index=True)
CPU times: user 152 ms, sys: 49.7 ms, total: 202 ms
Wall time: 5 s

В конечном итоге вы можете улучшить свою функцию fun, используя numba,

0 голосов
/ 14 октября 2019

Из-за многопроцессорной обработки GIL фактически не используются два разных потока. В процессах, связанных с процессором, использование многопроцессорности не даст вам такой большой, если вообще когда-либо, дополнительной производительности.

Существует библиотека под названием dask , в которой API-интерфейс похож на pandas. но под капотом он выполняет много асинхронных и чанковых операций, и это не ускоряет обработку фреймов данных ставок.

...