Python заправка не начинается - PullRequest
0 голосов
/ 26 мая 2020

Я написал большую программу, которая обрабатывает большой набор данных из 70k документов. Каждый документ занимает около 5 секунд, поэтому я хочу распараллелить процедуру. Код не работает, и я не могу понять почему. Я пробовал это только с одним рабочим, чтобы убедиться, что это не проблема памяти.

Код:

from doc_builder import DocBuilder
from glob import glob
from tqdm import tqdm
import threading

path = "/home/marcel/Desktop/transformers-master/examples/token-classification/CORD-19-research-challenge/document_parses/test_collection"
paths = [x for x in glob(path + '/**/*.json', recursive=True)]
workers_amount = 1

def main(paths):
    doc_builder = DocBuilder()
    for path in tqdm(paths):
        data, doc = doc_builder.get_doc(path)
        doc_builder.write_doc(path, data, doc)

threads = []
for i in range(workers_amount):
    worker_paths = paths[int((i-1/workers_amount)*len(paths)):int((i/workers_amount)*len(paths))]
    t = threading.Thread(target=main, args=[worker_paths])
    t.start()
    threads.append(t)

for t in threads:
    t.join()

Он просто случайным образом завершает выполнение через некоторое время. Потоки ЦП при запуске резко увеличиваются, но кроме этого на самом деле ничего не происходит. Что-то не так с кодом? Если это важно, я запускаю это на Ryzen 7 3700X (так что должно быть возможно 16 потоков).

/ edit: Сначала я подумал, что проблема может заключаться в том, что каждый поток инициализирует большую модель PyTorch и тренер вроде это:

self.tokenizer = AutoTokenizer.from_pretrained(self.pretrained_dir) #, cache_dir=cache_dir)
self.splitter = spacy.load(cd_dir + "/en_core_sci_md-0.2.4/en_core_sci_md/en_core_sci_md-0.2.4")
self.model = AutoModelForTokenClassification.from_pretrained(self.pretrained_dir, config=self.config_dir) #,cache_dir=cache_dir)
self.model.load_state_dict(torch.load(self.model_dir))
self.trainer = Trainer(model=self.model, args=TrainingArguments(output_dir=self.output_dir))

Это может быть разделено между потоками, поэтому мне не нужно каждый раз инициализировать новый (их инициализация очень затратна), но, как я уже сказал, я попытался использовать 1 рабочий Так что это не должно быть проблемой, верно?

1 Ответ

1 голос
/ 26 мая 2020

Может, попробуешь с помощью ThreadPoolExecutor? Снимает головную боль по управлению бассейном. Синтаксис может быть неправильным на 100%. Никогда не использовал tqdm.

from doc_builder import DocBuilder
from glob import glob
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor

path = "/home/marcel/Desktop/transformers-master/examples/token-classification/CORD-19-research-challenge/document_parses/test_collection"
files = [x for x in glob(path + '/**/*.json', recursive=True)]

WORKERS_AMOUNT = 16

def main(file):
    doc_builder = DocBuilder()
    data, doc = doc_builder.get_doc(tqdm(file))
    doc_builder.write_doc(file, data, doc)

threads = []
with ThreadPoolExecutor(max_workers=WORKERS_AMOUNT) as executor:
    for file in files:
        threads.append(executor.submit(main, file))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...