Различные результаты при использовании многопроцессорной и многопроцессорной.dummy - PullRequest
0 голосов
/ 08 февраля 2019

У меня есть долгосрочное задание, которое можно распараллелить.Я отлаживал код с использованием multiprocessing.dummy.Это работает хорошо, и я получаю ожидаемые результаты.Но когда я изменяю его на multiprocessing, он запускает _test функцию невероятно быстро, а фактический вывод даже не затрагивается

Работа состоит в том, чтобы заполнять панду DataFrame данными до некоторой строкипорог счета.Каждый из более длинных процессов в цикле while добавляет около 2500 строк за один прогон.Сбор данных не зависит от других процессов.

Идея состоит в том, что процессы пропускают DataFrame через очередь между собой и используют блокировку для блокировки доступа к нему со стороны других процессов, пока они работают с Dataframe.Как только их работа завершена, они кладут ее обратно и снимают блокировку.

Как только DataFrame заполнен до требуемого размера, процесс может завершиться, и другие процессы больше не требуются для завершения (но я не уверен, если они завершеныкак только они заканчиваются без join () или только того, что с ними происходит - проверка .is_alive () может заменить .join ())

Для этого примера TRAINING_DATA_LENGTH установлен только на 10k, но фактический размер будет многовыше

Проблема в том, что при переходе с multiprocessing.dummy на multiprocessing вся операция завершается за 0,7 секунды, а возвращаемый размер X равен 0


  • Может быть, есть другой способ сделать это, но я еще не знаю об этом.

  • Также мне нужно, чтобы он запускался в отдельном файле, а не __main__


test_mp.py

import pandas as pd
import multiprocessing
from multiprocessing import Process,Queue,Lock
import time
import numpy as np


TRAINING_DATA_LENGTH = 10e3

def get_training_data_mp(testing = False,updating = False):    
    s = time.time()
    processes = []
    output = Queue()
    X = pd.DataFrame([])
    output.put(X)
    lock = Lock()

    for i in range(multiprocessing.cpu_count()):           
        p = Process(target=_test,args=(testing,updating,5000,1000,lock,output))
        p.daemon = True
        p.start()
        processes.append(p)

    print([p.is_alive() for p in processes])
#    while all([p.is_alive() for p in processes]):
#        print('alive')    
#        time.sleep(3)            

    for process in processes:
        process.join()               
    print('finished')  

    X = output.get()
    e = time.time()
    print(e-s)
    return X

def _test(testing,updating,max_test_amount,max_train_amount_from_last_days,lock,output):
    time.sleep(2) # short init work

    lock.acquire()   
    X = output.get() 

    while (((not testing or updating) and X.shape[0]<TRAINING_DATA_LENGTH)     or 
           (testing and X.shape[0]<max_test_amount)):

        if updating and X.shape[0]<max_train_amount_from_last_days:
            output.put(X)
            lock.release()

            time.sleep(2) # long work
            action = '1'
        elif (testing and X.shape[0]<max_test_amount*0.25) and not updating:
            output.put(X)
            lock.release()

            time.sleep(2) # long work
            action = '2'
        else:
            output.put(X)
            lock.release()

            time.sleep(2) # long work
            action = '3'               

        time.sleep(5) # main long work
        x = pd.DataFrame(np.random.randint(0,10000,size=(2500, 4)), columns=list('ABCD')) # simulated result

        lock.acquire()
        X = output.get()
        if X.shape[0] == 0:
            X = x
        else:
            X = X.append(x)   

        # correcting output    
        X = X.drop_duplicates(keep='first')
        X.reset_index(drop=True,inplace = True)
        time.sleep(0.5) # short work

    output.put(X)    
    lock.release() 

и запустить его из другого файла

import test_mp
X = test_mp.get_training_data_mp(True)
print(X.shape[0])

с помощью multiprocessing.dummy Я получаюследующий вывод:

[True, True, True, True]
finished
17.01797342300415
12500

с multiprocessing Its:

[True, True, True, True]
finished
0.7530431747436523 # due to time.sleep() its impossible to be finished this fast
0 # expected >= TRAINING_DATA_LENGTH

1 Ответ

0 голосов
/ 09 февраля 2019

Добавление if __name__ == '__main__': в runfile заставило код выполнить и получить "некоторые" результаты.Но при дополнительном тестировании кажется, что используется только одно ядро ​​(или у меня что-то не так в коде)

import test_mp
if __name__ == '__main__':
    X = test_mp.get_training_data_mp()
    print([x.shape[0] for x in X])

test_mp.py

import multiprocessing
from multiprocessing import Process,Queue,Lock
import time
import numpy as np


TRAINING_DATA_LENGTH = 10e3

def get_training_data_mp(testing = False,updating = False): 
    s = time.time()
    processes = []
    output = Queue()
    X = []
    x = [X,X,X,X]
    output.put(x)
    lock = Lock()

    for i in range(multiprocessing.cpu_count()):           
        p = Process(target=_test,args=(i,testing,updating,5000,1000,lock,output))
        p.daemon = True
        p.start()
        processes.append(p)

    while all([p.is_alive() for p in processes]):  
        lock.acquire()
        x = output.get()
        print([len(X) for X in x])
        output.put(x)
        lock.release()
        time.sleep(3)    

    print([p.is_alive() for p in processes])
#    for process in processes:
#        process.join()               
    print('finished') 

    x = output.get()
    my_x = x

    e = time.time()
    print(e-s)
    return my_x

def _test(i,testing,updating,max_test_amount,max_train_amount_from_last_days,lock,output):
    time.sleep(2) # long work

    lock.acquire()   
    x = output.get()
    X = x[i]

    while (((not testing or updating) and len(X)<TRAINING_DATA_LENGTH) or 
           (testing and len(X)<max_test_amount)):

        x[i] = X
        output.put(x)
        lock.release()              

        y = np.array(np.random.randint(0,10000,size=(2500, 4)))
        time.sleep(2) # main long work

        lock.acquire()
        X = output.get()
        X = x[i]
        if len(X) == 0:
            X = y
        else:
            X = np.append(X,y,axis=0)   
        # correcting output    
        time.sleep(0.5) # short work
    x[i] = X        
    output.put(x)    
    lock.release()

с multiprocessing.dummy Iполучить следующий вывод:

[0, 0, 0, 0]
[5000, 0, 5000, 0]
[False, True, True, True]
finished
7.50442910194397
[10000, 7500, 7500, 2500] # All processes were obtaining data <- intended

с multiprocessing Its:

[0, 0, 0, 0]
[0, 0, 2500, 0]
[0, 10000, 0, 0]
[False, False, True, False]
finished
12.15569543838501
[0, 0, 10000, 0] # Only one process was obtaining data <- wrong

Решено

time.sleep() не нагружает процессор, но при его переключениидля функции типа

def sleep():
    n = 0
    for i in range(6000):
        n = i**i

результаты как multiprocessing.dummy, так и multiprocessing соответствуют ожидаемым - оба возвращают одинаковую длину, но многопроцессорность в N раз быстрее

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...