Многопроцессорная обработка Python, не может повторяться с pool.map - PullRequest
0 голосов
/ 31 мая 2019

У меня 32-ядерный компьютер с оперативной памятью 256 ГБ, и я новичок в параллельных вычислениях.Я должен запустить эту строку:

Flag=data.flag [:,:,x]

, которая является очень большой матрицей.«х» относится к подматрице, которую я хочу сохранить в файл.Но «меняется» 64 раза, и каждая итерация занимает + - 8 минут, поэтому необходимо распараллелить ее

Я рассмотрел несколько примеров: https://www.machinelearningplus.com/python/parallel-processing-python/ и https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing-programming

Функция для кода:

def multi_flagger(antenna_no):
    recv=data.corr_products[antenna_no][0]         # Gets the antenna name and polarization value
    flagger=data.flags[:, :, antenna_no]         # Flags the data based on the antenna value
    mat_flag = np.matrix(flagger)                # Morphs the 2d array into a matrix
    np.save('Flagged_data_'+str(recv)+'_.npy', mat_flag)  # Saves the data to a file

pool.map(multi_flagger, for i in range(2))
pool.close()

Я получаю эту ошибку:

File "<ipython-input-58-e1a6f9779b9a>", line 1
   pool.map(multi_flagger, for i in range(2))
                             ^ SyntaxError: invalid syntax`

Я хотел бы записать 64 файла .npyна диск

Ответы [ 2 ]

1 голос
/ 31 мая 2019

Метод Pool.map(func, iterable) ожидает итерацию, которая может быть tuple или list. Вы не можете передать цикл for как то, что вы пробовали (for i in range(2)), но вместо этого вы можете передать список чисел из диапазона :

pool.map(multi_flagger, list(range(2)))  # [0, 1]

Я не уверен насчет остальной части вашего кода, но при использовании Pool вы устанавливаете количество рабочих процессов при создании экземпляра Pool (как объяснено в Использование пула рабочие пример), а затем вызовите map, чтобы передать функцию для выполнения и входные данные функции:

def multi_flagger(antenna_no):
    recv=data.corr_products[antenna_no][0]         
    flagger=data.flags[:, :, antenna_no]         
    mat_flag = np.matrix(flagger)                    
    np.save('Flagged_data_'+str(recv)+'_.npy', mat_flag)

with Pool(processes=3) as pool:               # set the number of worker processes
    pool.map(multi_flagger, list(range(2)))   # pass a list of antenna_no
    pool.close()
0 голосов
/ 04 июня 2019

поэтому вопрос заключался в особой упаковке под названием «катдал», используемой в радиотелескопах. Я разобрался с решением, это было из-за линии флага. Данные нужно было читать каждый раз

def multi_flagger(antenna_no): data = katdal.open('/'+prefix+'/'+fname+'/'+fname+'/'+fname+'_sdp_l0.full.rdb')<br> recv=data.corr_products[antenna_no][0] # Gets the antenna and polarization value flagger=data.flags[:, :, antenna_no] # Flags the data based on the antenna value np.save('Flagged_data_'+str(recv)+'_.npy', np.matrix(flagger) ) # Morphs the 2d array into a matrix and Saves the data to a file

Но это оставляет меня с новой проблемой, связанной с тем, что файл данных читается постоянно, по существу занимая время и пространство.

...