Я ищу более короткие способы подготовки набора данных для задачи машинного обучения. Я обнаружил, что многопроцессорная библиотека может быть полезной. Однако, поскольку я новичок ie в многопроцессорной обработке, я не мог найти правильный путь.
Сначала я написал несколько кодов, как показано ниже:
class DatasetReader:
def __init__(self):
self.data_list = Read_Data_from_file
self.data = []
def _ready_data(self, ex, idx):
# Some complex functions that takes several minutes
def _dataset_creator(self, queue):
for idx, ex in enumerate(self.data_list):
queue.put(self._ready_data(ex, idx))
def _dataset_consumer(self, queue):
total_mem = 0.0
t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ', bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) [{elapsed}<{remaining},{rate_fmt}{postfix}]')
for idx in t:
ins = queue.get()
self.data.append(ins)
gc.collect()
def _build_dataset(self):
queue = Queue()
creator = Process(target=self._dataset_creator, args=(queue,))
consumer = Process(target=self._dataset_consumer, args=(queue,))
creator.start()
consumer.start()
queue.close()
queue.join_thread()
creator.join()
consumer.join()
Однако, на мой взгляд, поскольку _dataset_creator
обрабатывает данные (здесь _ready_data
) последовательным образом , это не поможет сократить потребление времени.
Итак, я изменил код для генерации нескольких процессов, которые обрабатывают один элемент данных:
class DatasetReader:
def __init__(self):
self.data_list = Read_Data_from_file
self.data = []
def _ready_data(self, ex, idx):
# Some complex functions that takes several minutes
def _dataset_creator(self, ex, idx, queue):
queue.put(self._ready_data(ex, idx))
def _dataset_consumer(self, queue):
total_mem = 0.0
t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ', bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) [{elapsed}<{remaining},{rate_fmt}{postfix}]')
for idx in t:
ins = queue.get()
self.data.append(ins)
gc.collect()
def _build_dataset(self):
queue = Queue()
for idx, ex in enumerate(self.data_list):
p = Process(target=self._dataset_creator, args=(ex, idx, queue,))
p.start()
consumer = Process(target=self._dataset_consumer, args=(queue,))
consumer.start()
queue.close()
queue.join_thread()
consumer.join()
Однако это возвращает мне ошибки:
Process Process-18:
Traceback ~~~
RuntimeError: can't start new thread
Traceback ~~~
OSError: [Errno 12] Cannot allocate memory
Не могли бы вы мне помочь? обрабатывать сложные данные параллельно?
РЕДАКТИРОВАТЬ 1:
Благодаря @tdelaney я могу сократить потребление времени, генерируя self.num_worker
процессов (16 в моем эксперименте):
def _dataset_creator(self, pid, queue):
for idx, ex in list(enumerate(self.data_list))[pid::self.num_worker]:
queue.put(self._ready_data(ex, idx))
def _dataset_consumer(self, queue):
t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ', bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) [{elapsed}<{remaining},{rate_fmt}{postfix}]')
for _ in t:
ins = queue.get()
self.data[ins['idx']] = ins
def _build_dataset(self):
queue = Queue()
procs = []
for pid in range(self.num_worker):
p = Process(target=self._dataset_creator, args=(pid, queue,))
procs.append(p)
p.start()
consumer = Process(target=self._dataset_consumer, args=(queue,))
consumer.start()
queue.close()
queue.join_thread()
for p in procs:
p.join()
consumer.join()