Как реализовать параллельный процесс на огромном фрейме данных - PullRequest
0 голосов
/ 04 июля 2019

Теперь у меня есть один огромный фрейм данных "all_in_one",

all_in_one.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8271066 entries, 0 to 8271065
Data columns (total 3 columns):
label    int64
text     object
type     int64
dtypes: int64(2), object(1)
memory usage: 189.3+ MB
all_in_one.sample(2)

enter image description here

Мне нужно запустить сегментацию слов в столбце "текст"этот фрейм данных.

import jieba
import re

def jieba_cut(text):
    text_cut = list(filter(lambda x: re.match("\w", x),
                            jieba.cut(text)))
    return text_cut
%%time
all_in_one['seg_text'] = all_in_one.apply(lambda x:jieba_cut(x['text']),axis = 1)
CPU times: user 1h 18min 14s, sys: 55.3 s, total: 1h 19min 10s
Wall time: 1h 19min 10s

Итого этот процесс занял более 1 часа.Я хочу параллельно выполнить сегментацию слов на фрейме данных и сократить время выполнения.Пожалуйста, оставьте сообщение.

РЕДАКТИРОВАТЬ:

Удивительно, когда я использовал dask для реализации функции выше.

all_in_one_dd = dd.from_pandas(all_in_one, npartitions=10)
%%time
all_in_one_dd.head()
CPU times: user 4min 10s, sys: 2.98 s, total: 4min 13s
Wall time: 4min 13s

1 Ответ

1 голос
/ 04 июля 2019

Я бы предложил, если вы работаете с пандами и хотите поработать над какой-либо формой параллельной обработки, я предлагаю вам использовать dask. Это пакет Python, который имеет тот же API, что и pandas dataframes, поэтому в вашем примере, если у вас есть файл csv с именем file.csv, вы можете сделать что-то вроде:

Вам нужно будет выполнить некоторые настройки для клиента dask и указать, сколько рабочих вы хотите и сколько ядер использовать.

import dask.dataframe as dd
from dask.distributed import Client
import jieba

def jieba_cut(text):
    text_cut = list(filter(lambda x: re.match("\w", x),
                            jieba.cut(text)))
    return text_cut

client = Client() # by default, it creates the same no. of workers as cores on your local machine

all_in_one = dd.read_csv('file.csv') # This has almost the same kwargs as a pandas.read_csv

all_in_one = all_in_one.apply(jieba_cut) # This will create a process map

all_in_one = all_in_one.compute() # This will execute all the processes

Забавно, вы можете получить доступ к панели инструментов, чтобы увидеть все процессы, выполняемые dask (я думаю, по умолчанию это localhost:8787)

...