Я пытаюсь взять два массива: «день 1»: от 0 до 11 (с шагом +1) и «день 2:» от 11 до 0 (с -1) и сложить их. Тем не менее, я хочу использовать многопроцессорные и dask-массивы для ускорения процесса (позже я собираюсь увеличить их). Я хочу разделить день 1 и день 2 на четыре равные части (день 1: [0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11] и день 2: [11, 10, 9], [8, 7, 6], [5, 4, 3], [2, 1, 0]) и имеют четыре процесса для добавления работы над каждым последующим массивом (т. Е. Day1's [ 0, 1, 2] со дня 2 [11, 10, 9] и получите [11, 11, 11]. После того, как все четыре процесса завершены, я надеюсь вернуться обратно в один большой список [11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11]. Однако в рамках функции, выделенной жирным шрифтом, код не запускается и застревает в бесконечном цикле или каких-либо вычислениях.
Код:
import numpy as np
import dask.array as da
from dask import delayed
import threading
import multiprocessing as mp
NUM_WORKERS = 4
# create list from 0 to 11
day1 = list(range(12))
# create list form 11 to 0
day2 = day1[::-1]
def get_sum(i, base):
z = []
x = day1[i * length: i * length + length]
y = day2[i * length: i * length + length]
z.append(x)
z.append(y)
converted = da.from_array(z, chunks = NUM_WORKERS)
**summed = da.sum(converted, axis = 0).compute()**
list_concatenate = np.concatenate((base, summed), axis=0)
all_sum = sum(list_concatenate)
process_list = []
for i in range(NUM_WORKERS):
process_list = mp.Process(target = get_sum, args = (i, process_list))
process_list.start()
process_list.join()