Распараллеливание fastText.get_sentence_vector с dask дает ошибку травления - PullRequest
0 голосов
/ 25 апреля 2020

Я пытался получить встраивание предложений fastText для 80 миллионов сообщений в Твиттере sh с использованием механизма распараллеливания с использованием dask, как описано в этом ответе: Как распараллелить apply () на Pandas Кадры данных, использующие все ядра на одной машине?

Вот мой полный код:

import dask.dataframe as dd
from dask.multiprocessing import get
import fasttext
import fasttext.util
import pandas as pd

print('starting langage: ' + 'en')
lang_output = pd.DataFrame()
lang_input = full_input.loc[full_input.name == 'en'] # 80 Million English tweets
ddata = dd.from_pandas(lang_input, npartitions = 96)
print('number of lines to compute: ' + str(len(lang_input)))
fasttext.util.download_model('en', if_exists='ignore')  # English
ft = fasttext.load_model('cc.'+'en'+'.300.bin')
fasttext.util.reduce_model(ft, 20)
lang_output['sentence_embedding'] = ddata.map_partitions(lambda lang_input: lang_input.apply((lambda x: get_fasttext_sentence_embedding(x.tweet_text, ft)), axis = 1)).compute(scheduler='processes')
print('finished en')

Это функция get_fasttext_sentence_embedding :

def get_fasttext_sentence_embedding(row, ft):
    if pd.isna(row):
        return np.zeros(20)
    return ft.get_sentence_vector(row)

Но я получаю сообщение об ошибке на этой строке:

lang_output['sentence_embedding'] = ddata.map_partitions(lambda lang_input: lang_input.apply((lambda x: get_fasttext_sentence_embedding(x.tweet_text, ft)), axis = 1)).compute(scheduler='processes')

Это ошибка, которую я получаю:

TypeError: can't pickle fasttext_pybind.fasttext objects

Есть ли способ распараллелить модель fastText get_sentence_vector с dask (или что-нибудь еще? еще)? Мне нужно распараллелить, потому что получение вложений предложений для 80 миллионов твитов занимает два больших времени, и одна строка моего фрейма данных полностью независима от другой.

1 Ответ

1 голос
/ 26 апреля 2020

Проблема здесь в том, что fasttext объекты, по-видимому, не могут быть обработаны, и Dask не знает, как сериализовать и десериализовать эту структуру данных без травления.

Самый простой способ использования Dask здесь ( но, вероятно, не самый эффективный), чтобы каждый процесс определял саму модель ft, что позволило бы избежать ее переноса (и, таким образом, избежать попыток травления). Что-то вроде следующего будет работать. Обратите внимание, что ft определено внутри функции, отображаемой между разделами.

Сначала приведем несколько примеров данных.

import dask.dataframe as dd
import fasttext
import pandas as pd
import dask
import numpy as np

df = pd.DataFrame({"text":['this is a test sentence', None, 'this is another one.', 'one more']})
ddf = dd.from_pandas(df, npartitions=2)
ddf

Dask DataFrame Structure:
text
npartitions=2   
0   object
2   ...
3   ...
Dask Name: from_pandas, 2 tasks

Затем мы можем настроить ваши функции, чтобы определить ft в каждом обработать. Это дублирует усилия, но устраняет необходимость переноса модели. При этом мы можем плавно запустить его через map_partitions.

def get_embeddings(sent, model):
    return model.get_sentence_vector(sent) if not pd.isna(sent) else np.zeros(10)

def func(df):
    ft = fasttext.load_model("amazon_review_polarity.bin") # arbitrary model
    res = df['text'].apply(lambda x: get_embeddings(x, model=ft))
    return res

ddf['sentence_vector'] = ddf.map_partitions(func)
ddf.compute(scheduler='processes')

text    sentence_vector
0   this is a test sentence [-0.01934033, 0.03729743, -0.04679677, -0.0603...
1   None    [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
2   this is another one.    [-0.0025579212, 0.0353713, -0.027139299, -0.05...
3   one more    [-0.014522496, 0.10396308, -0.13107553, -0.198...

Обратите внимание, что эта вложенная структура данных (список в столбце), вероятно, не является оптимальным способом обработки этих векторов, но она будет зависеть от ваш вариант использования. Кроме того, вероятно, есть способ выполнить это вычисление в пакетах, используя fastext вместо одной строки за раз (в Python), но я не очень хорошо разбираюсь в нюансах fastext.

...