Joblib не вызывает пользовательскую функцию, когда n_jobs> 1 - PullRequest
1 голос
/ 29 апреля 2020

У меня есть пример с данными.

Как видно из кода, каждый вызов функции fit_by_idx() должен выводить 'here', но это не так. Все нормально когда n_jobs=1, но если n_jobs больше, чем joblib не вызывает функцию.

Код:

import statsmodels.tsa.holtwinters as holtwinters
import pandas as pd
import numpy as np
from joblib import Parallel, delayed

train = pd.read_csv('train.csv').drop(columns=['id'])


def iter_predict(data, model, steps, fit_args=[],  fit_kwargs={}): # steps - кол. предсказываемых точек
    def fit_by_idx(idx):
        print('here')
        endog = data.iloc[idx]
        fitted = model(endog).fit(*fit_args, optimized=False, **fit_kwargs)\
        res[idx, :] = fitted.forecast(steps)

    res = np.zeros((data.shape[0], steps))
    Parallel(n_jobs=2)(delayed(fit_by_idx)(idx) for idx in range(data.shape[0]))
    return res

iter_predict(train, holtwinters.SimpleExpSmoothing, 2, fit_kwargs={'smoothing_level': 0.5})

А вот ссылка на набор данных .

1 Ответ

0 голосов
/ 30 апреля 2020

Q : ", если n_jobs больше, чем joblib не вызывает функцию ."

Что ж, да (вы можете проверить номера PID и PPID),
это просто не представляет результат print( "here" )

Использование определения из Документация по API:

print( *objects, sep = ' ', end = '\n', file = sys.stdout, flush = False ) начать с принудительного выполнения flush = True

Тем не менее, в будущем возникнут другие проблемы. joblib - порождает (если не вызвано иное, за счет отрицательного влияния на производительность, если вернуться к выполнению чистого [SERIAL], управляемого GIL повторного [SERIAL] кода для любого n_jobs, запускающего снова один -шаг-за-другим-за-другим, что не имеет смысла, поскольку вы оплачиваете все расходы на создание экземпляров и другие накладные расходы, но при этом не получаете никакой выгоды от ускорения, не так ли?). Используя:

def iter_preDEMO( data,            # Pandas DF-alike data
                  #other args removed for MCVE-clarity
                  ):

    def fit_by_idx( idx ): #-------------------------------------[FUNCTION]-def-<start> To be transferred to each remote-joblib-initiated process(es)

        print( 'here[{0:_>4d}(PPID:PID={1:_>7d}:{2::>7d})]'.format( idx,
                                                                    os.getppid(), # test joblib-[FUNCTION]-def-transfer here with: lambda x = "_{0:}_" : x.format( os.getppid() )
                                                                    os.getpid()   # test joblib-[FUNCTION]-def-transfer here with: lambda x = "_{0:}_" : x.format( os.getpid()  )
                                                                    ),
                end   = "\t",
                flush = True
                )
    #------------------------------------------------------------[FUNCTION]-def-<end>

    res = np.zeros( ( data.shape[0], 3 ) )
    for aBackEND in ( 'threading', 'loky', 'multiprocessing' ):
        try:
             print( "\n____________________________Going into ['{0:}']-backend".format( aBackEND ) )
             with parallel_backend( aBackEND, n_jobs = N_JOBS ):
                  Parallel( n_jobs = N_JOBS )( delayed( fit_by_idx )( pickled_SER_DES_copy_of_idx )
                                               for                    pickled_SER_DES_copy_of_idx in range( data.shape[0] )
                                               )
        finally:
             print( "\n_____________________________Exit from ['{0:}']-backend".format( aBackEND ) )
    return res

, вы увидите, как все это работает, используя более подробные print() -веденные результаты

START: PID=_____22528

____________________________Going into ['threading']-backend
here[___0(PPID:PID=__22527:::22528)]    here[___1(PPID:PID=__22527:::22528)]    here[___2(PPID:PID=__22527:::22528)]    here[___3(PPID:PID=__22527:::22528)]    here[___4(PPID:PID=__22527:::22528)]    here[___5(PPID:PID=__22527:::22528)]    here[___6(PPID:PID=__22527:::22528)]    here[___7(PPID:PID=__22527:::22528)]    here[___8(PPID:PID=__22527:::22528)]    here[___9(PPID:PID=__22527:::22528)]    here[__10(PPID:PID=__22527:::22528)]    here[__11(PPID:PID=__22527:::22528)]    here[__12(PPID:PID=__22527:::22528)]    here[__13(PPID:PID=__22527:::22528)]    here[__14(PPID:PID=__22527:::22528)]    here[__15(PPID:PID=__22527:::22528)]    here[__16(PPID:PID=__22527:::22528)]    
_____________________________Exit from ['threading']-backend

____________________________Going into ['loky']-backend
here[___0(PPID:PID=__22527:::22528)]    here[___1(PPID:PID=__22527:::22528)]    here[___2(PPID:PID=__22527:::22528)]    here[___3(PPID:PID=__22527:::22528)]    here[___4(PPID:PID=__22527:::22528)]    here[___5(PPID:PID=__22527:::22528)]    here[___6(PPID:PID=__22527:::22528)]    here[___7(PPID:PID=__22527:::22528)]    here[___8(PPID:PID=__22527:::22528)]    here[___9(PPID:PID=__22527:::22528)]    here[__10(PPID:PID=__22527:::22528)]    here[__11(PPID:PID=__22527:::22528)]    here[__12(PPID:PID=__22527:::22528)]    here[__13(PPID:PID=__22527:::22528)]    here[__14(PPID:PID=__22527:::22528)]    here[__15(PPID:PID=__22527:::22528)]    here[__16(PPID:PID=__22527:::22528)]    
_____________________________Exit from ['loky']-backend

____________________________Going into ['multiprocessing']-backend
here[___0(PPID:PID=__22527:::22528)]    here[___1(PPID:PID=__22527:::22528)]    here[___2(PPID:PID=__22527:::22528)]    here[___3(PPID:PID=__22527:::22528)]    here[___4(PPID:PID=__22527:::22528)]    here[___5(PPID:PID=__22527:::22528)]    here[___6(PPID:PID=__22527:::22528)]    here[___7(PPID:PID=__22527:::22528)]    here[___8(PPID:PID=__22527:::22528)]    here[___9(PPID:PID=__22527:::22528)]    here[__10(PPID:PID=__22527:::22528)]    here[__11(PPID:PID=__22527:::22528)]    here[__12(PPID:PID=__22527:::22528)]    here[__13(PPID:PID=__22527:::22528)]    here[__14(PPID:PID=__22527:::22528)]    here[__15(PPID:PID=__22527:::22528)]    here[__16(PPID:PID=__22527:::22528)]    
_____________________________Exit from ['multiprocessing']-backend

 [[0. 0. 0.]
  [0. 0. 0.]
  ...
  ]

Также проверьте это и это на вашем O / S, соответственно. ваши действительные версии joblib и (скрытые) инструментов для травления SER / DES.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...