объекты пула не могут быть переданы между процессами или засолены - PullRequest
0 голосов
/ 27 февраля 2020

У меня есть следующий класс (сокращенный для краткости), который стандартизирует строки SMILES при создании экземпляра. Я пытался ускорить процесс, используя все мои процессоры с параллельной обработкой, используя преимущества многопроцессорного пакета в Python 3.7.4.

class Standardiser(object):

    def __call__(self):
        return self.prepare_dataset()

    def __init__(self, DataFrame):
        self.DataFrame = DataFrame
        self.standardiser = mv.Standardizer()
        self.salt_remover = SaltRemover()
        self.accepted_atoms = ['H','C','N','O','F','S','Cl','Br','I','P']
        self.pool = mp.Pool(processes = mp.cpu_count())

    def prepare_dataset(self, standardise = True, remove_charge = False):

        standard_smiles = []

        if standardise:

            standardised_smiles = [self.pool.apply_async(self.standardise_compound, args = (x,)).get() for x in self.DataFrame['Molecule']]           

            DataFrame = pd.concat([self.DataFrame[['Activity','Molecule']], pd.Series(standardised_smiles)], axis = 1)

            return DataFrame

    def standardise_compound(self, mol, min_heavy_atoms = 0, max_heavy_atoms = 50, max_len = 150, remove_charge = False):

        try:
            if selected_fragment is None:
                return None

            if remove_charge:
                mol = remove_charge_mol(selected_fragment)

            if min_heavy_atoms <= mol.GetNumHeavyAtoms() <= max_heavy_atoms:
                smiles = Chem.MolToSmiles(selected_fragment, isomericSmiles = False, canonical = True)   

                if len(smiles) <= max_len:
                    return smiles

        except Exception as e:
            print(e)

Я создаю его экземпляр с соответствующим DataFrame и затем я вызываю его, но мне выдается следующая ошибка:

NotImplementedError                       Traceback (most recent call last)
<ipython-input-60-1c181cd43d85> in <module>()
      1 standardise = Standardiser(df[:100])
----> 2 dff = standardise()
      3 dff.head()

<ipython-input-59-a6677d6c7724> in __call__(self)
      4 
      5     def __call__(self):
----> 6         return self.prepare_dataset()
      7 
      8     def __init__(self, DataFrame):

<ipython-input-59-a6677d6c7724> in prepare_dataset(self, standardise, remove_charge)
     22 
---> 23             standardised_smiles = [self.pool.apply(self.standardise_compound, args = (x,)).get() for x in self.DataFrame['Molecule']]
     24 
     25             DataFrame = pd.concat([self.DataFrame[['Activity','Molecule']], pd.Series(standardised_smiles)], axis = 1)

<ipython-input-59-a6677d6c7724> in <listcomp>(.0)
     22 
---> 23             standardised_smiles = [self.pool.apply(self.standardise_compound, args = (x,)).get() for x in self.DataFrame['Molecule']]
     24 
     25             DataFrame = pd.concat([self.DataFrame[['Activity','Molecule']], pd.Series(standardised_smiles)], axis = 1)

~/.conda/envs/dalkeCourse/lib/python3.6/multiprocessing/pool.py in apply(self, func, args, kwds)
    257         '''
    258         assert self._state == RUN
--> 259         return self.apply_async(func, args, kwds).get()
    260 
    261     def map(self, func, iterable, chunksize=None):

~/.conda/envs/dalkeCourse/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
    642             return self._value
    643         else:
--> 644             raise self._value
    645 
    646     def _set(self, i, obj):

~/.conda/envs/dalkeCourse/lib/python3.6/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    422                         break
    423                     try:
--> 424                         put(task)
    425                     except Exception as e:
    426                         job, idx = task[:2]

~/.conda/envs/dalkeCourse/lib/python3.6/multiprocessing/connection.py in send(self, obj)
    204         self._check_closed()
    205         self._check_writable()
--> 206         self._send_bytes(_ForkingPickler.dumps(obj))
    207 
    208     def recv_bytes(self, maxlength=None):

~/.conda/envs/dalkeCourse/lib/python3.6/multiprocessing/reduction.py in dumps(cls, obj, protocol)
     49     def dumps(cls, obj, protocol=None):
     50         buf = io.BytesIO()
---> 51         cls(buf, protocol).dump(obj)
     52         return buf.getbuffer()
     53 

~/.conda/envs/dalkeCourse/lib/python3.6/multiprocessing/pool.py in __reduce__(self)
    526     def __reduce__(self):
    527         raise NotImplementedError(
--> 528               'pool objects cannot be passed between processes or pickled'
    529               )
    530 

NotImplementedError: pool objects cannot be passed between processes or pickled

в классе не происходит травление, и мне было интересно, есть ли какие-либо проблемы с реализацией многопроцессорной обработки.

РЕДАКТИРОВАТЬ Я преобразовал функцию standardise_compound в метод @classmethod, и выданная ошибка изменилась на:

standardise_mol() missing 1 required positional argument: 'mol'
...