Как сделать кусок кода, который делает параллелизацию атрибутов в наборе данных? - PullRequest
1 голос
/ 04 марта 2020

Я работаю над симуляцией автомобильных сообщений с использованием модуля SUMO 1.3.1 и python traci в Ubuntu 18.04.4. Каждый автомобиль имеет свой собственный информационный фрейм сообщений со строками, которые являются идентификаторами других транспортных средств в симуляции, а столбцы - временем, они создаются в начале кода и заполняются нулями (это нельзя изменить, потому что это было попросил меня сделать это так).

Каждый автомобиль проверяется индивидуально, чтобы увидеть, находятся ли другие автомобили в заданном диапазоне, чтобы отправить им сообщение, и если это так, он запускает процесс, в котором программа изменяет значение в таблице получателя в соответствии с текущим временем. и идентификатор отправителя 1 или 2, в зависимости от выбранного шанса. Этот кусок кода работал очень медленно, потому что он использовал только один процессор. Чего я хочу добиться, так это использовать распараллеливание для ускорения этого процесса.

То, что я делал до сих пор, начинает использовать многопроцессорную библиотеку, проблема в том, что код теперь работает еще медленнее. Я не знаю, использовал ли я этот модуль неправильно или, может быть, есть другой модуль, более подходящий для такой ситуации.

Вот код, используемый до многопроцессорной обработки:

for car in car_vector:
                dist = np.sqrt((pos_vector_x - traci.vehicle.getPosition(car)[0])**2 + (pos_vector_y - traci.vehicle.getPosition(car)[1])**2) #creates a vector with all distances from the cars in the simulation and the car being used in the moment
                for j , receiver in enumerate(dist): #generates a list of tupples, with j being an index for the distances, j is used to know which car correspond to that distance in the car_vector
                    if receiver < 30 and receiver > 0.5: #the distance defined is 30 meters and the condition of being higher than 0.5 comes to stop the car from sending messages to itself
                        dic_frames[car] = contamination(step - 2,car_vector[j],dic_frames[car])

С функция загрязнения:

def contamination(time,receiver,data_msgs): #pass on the contaminated colors to the non-contaminated cars
    threshold_lost_message = 0.2
    is_critical = random.randint(1,100) == 1 
    if random.random() > threshold_lost_message:
        if is_critical:
            data_msgs.loc[data_msgs.IDs == receiver,str(time) + '00ms'] = 2 
        else:
            data_msgs.loc[data_msgs.IDs == receiver,str(time) + '00ms'] = 1 
    return data_msgs

А вот код "использования" многопроцессорной обработки:

for car in car_vector:
                processes = []
                dist = np.sqrt((pos_vector_x - traci.vehicle.getPosition(car)[0])**2 + (pos_vector_y - traci.vehicle.getPosition(car)[1])**2) #creates a vector with all distances from the cars in the simulation and the car being used in the moment
                for j , receiver in enumerate(dist): #generates a list of tupples, with j being an index for the distances, j is used to know which car correspond to that distance in the car_vector
                    p = multiprocessing.Process(target=multiprocessing_func, args=(car,j,receiver,dic_frames,car_vector,step))
                    processes.append(p)
                    p.start()
                for process in processes:
                    process.join()

с multiprocessing_func:

def multiprocessing_func(car, j, receiver, dic_frames, car_vector, step):
    if receiver < 30 and receiver > 0.5:
        dic_frames[car_vector[j]] = contamination(step - 2,car,dic_frames[car_vector[j]]) 

Is есть ли способ заставить этот процесс использовать более одного процессора, даже если он только приписывает? Возможно, сделать это для более чем одной машины одновременно.

1 Ответ

1 голос
/ 17 марта 2020

Вы можете попробовать распараллелить во внешнем l oop, потому что многопроцессорность имеет значительные накладные расходы.

def func(car,pos_vector_x,pos_vector_y,dic_frames,car_vector,step):
     dist = ...

for car in car_vector:
    processes = []
    p = multiprocessing.Process(target=func, args=(car,pos_vector_x,pos_vector_y,dic_frames,car_vector,step)
    processes.append(p)
    p.start()
for process in processes:
    process.join()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...