многопроцессорность с текстовой или пространственной - PullRequest
1 голос
/ 09 октября 2019

Я пытаюсь ускорить обработку больших списков текстов через распараллеливание текстовых сообщений. Когда я использую пул из многопроцессорной обработки, полученный текстовый корпус выходит пустым. Я не уверен, заключается ли проблема в том, как я использую текстовую или многопроцессорную парадигму? Вот пример, который иллюстрирует мою проблему:

import spacy
import textacy
from multiprocessing import Pool

texts_dict={
"key1":"First text 1."
,"key2":"Second text 2."
,"key3":"Third text 3."
,"key4":"Fourth text 4."
}

model=spacy.load('en_core_web_lg')

# this works

corpus = textacy.corpus.Corpus(lang=model)

corpus.add(tuple([value, {'key':key}],) for key,value in texts_dict.items())

print(corpus) # prints Corpus(4 docs, 8 tokens)
print([doc for doc in corpus])

# now the same thing with a worker pool returns empty corpus

corpus2 = textacy.corpus.Corpus(lang=model)

pool = Pool(processes=2) 
pool.map( corpus2.add, (tuple([value, {'key':key}],) for key,value in texts_dict.items()) )

print(corpus2) # prints Corpus(0 docs, 0 tokens)
print([doc for doc in corpus2])

# to make sure we get the right data into corpus.add
pool.map( print, (tuple([value, {'key':key}],) for key,value in texts_dict.items()) )

Текстовая обработка основана на простоте. Spacy не поддерживает многопоточность, но, предположительно, должен работать в нескольких процессах. https://github.com/explosion/spaCy/issues/2075

Согласно великому намеку на @constt https://stackoverflow.com/a/58317741/4634344 сбор результатов в корпус работает для наборов данных, таких как n_docs = 10273 n_sentences = 302510 n_tokens = 2053129.

Для большего набора данных (16000 документов 3ММ токенов) я получаю следующую ошибку:

result_corpus=corpus.get() 
  File "<string>", line 2, in get
  File "/usr/lib/python3.6/multiprocessing/managers.py", line 772, in _callmethod
    raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError: 
---------------------------------------------------------------------------
Unserializable message: Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/managers.py", line 283, in serve_client
    send(msg)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

´ Я буду исследовать, но если у вас есть прямое решение - очень признателен!

1 Ответ

1 голос
/ 10 октября 2019

Из-за того, что процессы python выполняются в отдельных пространствах памяти, вы должны разделить ваш объект corpus между процессами в пуле. Для этого вам нужно обернуть объект corpus в разделяемый класс, который вы зарегистрируете в классе BaseManager . Вот как можно реорганизовать свой код, чтобы он работал:

#!/usr/bin/python3
from multiprocessing import Pool
from multiprocessing.managers import BaseManager

import spacy
import textacy


texts = {
    'key1': 'First text 1.',
    'key2': 'Second text 2.',
    'key3': 'Third text 3.',
    'key4': 'Fourth text 4.',
}


class PoolCorpus(object):

    def __init__(self):
        model = spacy.load('en_core_web_sm')
        self.corpus = textacy.corpus.Corpus(lang=model)

    def add(self, data):
        self.corpus.add(data)

    def get(self):
        return self.corpus


BaseManager.register('PoolCorpus', PoolCorpus)


if __name__ == '__main__':

    with BaseManager() as manager:
        corpus = manager.PoolCorpus()

        with Pool(processes=2) as pool:
            pool.map(corpus.add, ((v, {'key': k}) for k, v in texts.items()))

        print(corpus.get())

Вывод:

Corpus(4 docs, 16 tokens)
...