Как показано в заголовке, у меня есть большой фрейм данных (df
), который нужно обрабатывать построчно, так как df
большой (6 ГБ), я хочу использовать пакет multiprocessing
Python, чтобы ускорить его, ниже приведен игрушечный пример, учитывая мое умение писать и сложность задания, я кратко опишу, чего хочу достичь, и подробно расскажу о коде.
Исходные данныеdf
, из которого я хочу выполнить некоторый построчный анализ (порядок не имеет значения), который требует не только самого фокального ряда, но и других рядов, которые удовлетворяют определенным условиям. Ниже приведены данные игрушки и мой код,
import pandas as pd
import numpy as np
import itertools
from multiprocessing import Pool
import time
import math
# a test example
start_time = time.time()
df = pd.DataFrame({'value': np.random.randint(0, 10, size=30),
'district': (['upper'] * 5 + ['down'] * 5) * 3,
'region': ['A'] * 10 + ['B'] * 10 + ['C'] * 10})
df['row_id'] = df.index
print(df)
value district region row_id
0 8 upper A 0
1 4 upper A 1
2 0 upper A 2
3 3 upper A 3
4 0 upper A 4
5 0 down A 5
6 3 down A 6
7 7 down A 7
8 1 down A 8
9 7 down A 9
10 7 upper B 10
11 3 upper B 11
12 9 upper B 12
13 8 upper B 13
14 2 upper B 14
15 4 down B 15
16 5 down B 16
17 3 down B 17
18 5 down B 18
19 3 down B 19
20 3 upper C 20
21 1 upper C 21
22 3 upper C 22
23 0 upper C 23
24 3 upper C 24
25 2 down C 25
26 0 down C 26
27 1 down C 27
28 1 down C 28
29 0 down C 29
. Я хочу добавить еще два столбца count_b
и count_a
, которые просто подсчитывают количество строк, попадающих в диапазон (значение - 2, значение) и (значение, значение + 2) в одном и том же подмножестве region
и district
, например, count_b
для строки row_id==0
должно быть 0, поскольку в пределах region=='A'
и * нет строк1017 * имеет значение 7, попадающее в (8-2, 8). Таким образом, желаемый результат должен быть:
count_a count_b region row_id
0 0 0 A 0
1 0 1 A 1
2 0 0 A 2
3 1 0 A 3
4 0 0 A 4
5 1 0 A 5
6 0 0 A 6
7 0 0 A 7
8 0 1 A 8
9 0 0 A 9
10 1 0 B 10
11 0 1 B 11
12 0 1 B 12
13 1 1 B 13
14 1 0 B 14
15 2 2 B 15
16 0 1 B 16
17 1 0 B 17
18 0 1 B 18
19 1 0 B 19
20 0 0 C 20
21 0 1 C 21
22 0 0 C 22
23 1 0 C 23
24 0 0 C 24
25 0 2 C 25
26 2 0 C 26
27 1 2 C 27
28 1 2 C 28
29 2 0 C 29
вопрос 1: можно ли векторизовать такую задачу?
вопрос 2: как мы можем использовать multiprocessing
, чтобы ускорить его ( решено )?
Я решил пойти с multiprocessing
, потому что яне уверен, как это сделать с помощью векторизации. Решение (на основе ответа, предоставленного)
многопроцессорная
def b_a(input_df,r_d):
print('length of input dataframe: ' + str(len(input_df)))
# print('region: ' + str(r_d[0]), 'district: ' + str(r_d[1]))
sub_df = input_df.loc[(input_df['region'].isin([r_d[0]])) & (input_df['district'].isin([r_d[1]]))]
print('length of sliced dataframe: ' + str(len(sub_df)))
print(r_d[0],r_d[1])
b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])
for id in sub_df['row_id']:
print('processing row: ' + str(id))
focal_value = sub_df.loc[sub_df['row_id'].isin([id])]['value']
temp_b = sub_df.loc[
(sub_df['value'] > (focal_value - 2).values[0]) & (sub_df['value'] < (focal_value.values[0]))]
temp_a = sub_df.loc[
(sub_df['value'] > (focal_value.values[0])) & (sub_df['value'] < (focal_value + 2).values[0])]
if len(temp_a):
temp_a['count_a'] = temp_a['row_id'].count()
else:
temp_a = temp_a.append(pd.Series(), ignore_index=True)
temp_a = temp_a.reindex(
columns=[*temp_a.columns.tolist(), 'count_a'], fill_value=0)
print(temp_a)
if len(temp_b):
temp_b['count_b'] = temp_b['row_id'].count()
else:
temp_b = temp_b.append(pd.Series(), ignore_index=True)
temp_b = temp_b.reindex(
columns=[*temp_b.columns.tolist(), 'count_b'], fill_value=0)
print(len(temp_a),len(temp_b))
temp_b.drop_duplicates('count_b', inplace=True)
temp_a.drop_duplicates('count_a', inplace=True)
temp = pd.concat([temp_b[['count_b']].reset_index(drop=True),
temp_a[['count_a']].reset_index(drop=True)], axis=1)
temp['row_id'] = id
temp['region'] = str(r_d[0])
b_a = pd.concat([b_a, temp])
return b_a
r_d_list = list(itertools.product(df['region'].unique(),df['district'].unique()))
if __name__ == '__main__':
P = Pool(3)
out = P.starmap(b_a, zip([chunks[r_d_list.index(j)] for j in r_d_list for i in range(len(j))],
list(itertools.chain.from_iterable(r_d_list)))) # S3
# out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list)) # S2
# out = P.starmap(b_a,zip(df,r_d_list)) # S1
# print(out)
P.close()
P.join()
final = pd.concat(out, ignore_index=True)
print(final)
final.to_csv('final.csv',index=False)
print("--- %s seconds ---" % (time.time() - start_time))
Поскольку для использования P.starmap
(а также P.map
) требуется один, чтобы подпитать функцию всеми возможными пары аргумента для b_a
, решение S1
не будет работать, так как zip(df,r_d_list)
фактически производит zip между именами столбцов df
и элементами в r_d_list
, что затем приведет к ошибке AttributeError: 'str' object has no attribute 'loc'
, потому что input_df
для функции b_a
является буквально строкой (имя столбца df), что можно проверить, посмотрев на вывод print('length of input dataframe: ' + str(len(input_df)))
, который даст длину имен столбцов input_df
(вэто дело df
). Принятый ответ исправляет это путем создания ссылочного массива (S2
) (не уверен, что именно это), который имеет ту же длину, что и список параметров (r_d_list
). Это решение прекрасно работает, но может быть медленным, когда df
велико, поскольку, по моему личному пониманию, оно требует поиска по всему фрейму данных для каждой пары параметров (region
и distrcit
), поэтому я придумал измененныйверсия, которая разбивает данные на порции на основе region
и distrcit
, а затем выполняет поиск в каждом порции вместо всего фрейма данных (S3). Для меня это решение повышает производительность на 20 процентов с точки зрения времени выполнения, см. Ниже код:
region = df['region'].unique()
chunk_numbers = 3
chunk_region = math.ceil(len(region) / chunk_numbers)
chunks = list()
r_d_list = list()
row_count = 0
for i in range(chunk_numbers):
print(i)
if i < chunk_numbers-1:
regions = region[(i*chunk_region):((i+1)*chunk_region)]
temp = df.loc[df['region'].isin(regions.tolist())]
chunks.append(temp)
r_d_list.append(list(itertools.product(regions,temp['district'].unique())))
del temp
else:
regions = region[(i * chunk_region):len(region)]
temp = df.loc[df['region'].isin(regions.tolist())]
chunks.append(temp)
r_d_list.append(list(itertools.product(regions,temp['district'].unique())))
del temp
row_count = row_count + len(chunks[i])
print(row_count)
добавьте это значение между print(df)
и def b_a()
и не забудьте закомментировать r_d_list = ...
до if __name__ == '__main__'
.
Спасибо за это замечательное сообщество, теперь у меня есть работоспособное решение, я обновил свой вопрос, чтобы предоставить некоторые материалы для тех, кто может столкнуться с той же проблемой в будущем, а также чтобы лучше сформулировать вопрос, чтобы получить дажелучшие решения.