Как надежно убедиться, что определенное количество процессов каждый выполняет часть кода один раз? - PullRequest
0 голосов
/ 17 января 2019

У меня есть некоторый код, где у меня есть число x процессов, основанных на одной и той же функции, выполняемой вместе, и мне нужно надежно (100% времени), чтобы каждый из указанных процессов выполнял определенную часть кода только один раз. Это жизненно важно для всего моего проекта.

Я пытался использовать счетчик в качестве значения многопроцессорности и увеличивать его каждый раз, когда процесс выполнял указанную часть кода, которая не работала, поскольку некоторые процессы выполнялись два или даже три раза, прежде чем все процессы выполняли код один раз. , Я также попытался использовать многопроцессорный массив логических значений и установить для него значение True после того, как процесс сделал код, и хотя это было лучше, у меня все еще есть проблема, когда иногда (иногда этого достаточно, чтобы это классифицировалось как не работающее), все, кроме одного. процесс запуска. Смотрите ниже, где я сейчас нахожусь. (Обратите внимание, что все импорты и аргументы используются для других частей моего проекта).

import random
import sys
from threading import Thread
from multiprocessing import Value, Process, Lock, Event, Array, Pipe, Queue
import os
import time
from operator import itemgetter
import subprocess

# Arbitrary coefficients according to whether or not a home is equipped with a renewable energy source
renew_season_coef = [[1, 1, 1, 1], 
                [1.23, 1.3, 1.1, 1.08],
                [1.07, 1.09, 1.24, 1.27]]
def home(number_of_homes, ID, renewable_energy, policy, queue_lock, counter_lock, write_lock, home_queue, market_queue, energy_exchange_queue, home_counter, market_ready, temperature, season, weather_ready, clock_ready):
    home_id = ID + 1
    net_worth = 0

    while clock_ready.wait(3 * 2): # Wait for the clock at most 3 * delay seconds

        market_flag = False # Boolean to identify when homes are ready to send their final energy values to the market
        deficit_flag = False # Flag to identify as being in deficit of energy 

        weather_ready.wait()
        consumption = random.gauss(30.0, 3.0) + temperature[1] # Daily energy consumption of a normal household is approx 30 kWh
        production = random.gauss(30.0, 3.0) # Equivalent production to even out the energy balance of each home

        current_season = int(season[1]) # Fetch the current season determined in weather()
        production *= renew_season_coef[renewable_energy][current_season] # Production is influenced by homes having a source of renewable energy and the season
        energy_balance = production - consumption

        while not home_counter[ID]: # and market_queue.qsize() < number_of_homes:

            with queue_lock: # Acquire queue_lock to put the energy_balance in the market_queue
                market_queue.put([home_id, energy_balance])
                energy_exchange_queue.put([home_id, energy_balance])

            if energy_balance < 0:
                with queue_lock:
                    home_queue.put([home_id, energy_balance])
                with counter_lock:
                    home_counter[ID] = True

            else:
                with counter_lock:
                    home_counter[ID] = True

        clock_ready.clear()
        weather_ready.clear()

        # for i in range(number_of_homes):
            # home_counter[i] = False

        for i in range(energy_exchange_queue.qsize()):
            print(energy_exchange_queue.get())

Вот главное где я запускаю домашние процессы

if __name__ == "__main__":

    start = time.time()

    number_of_homes = int(sys.argv[1])
    number_of_days = int(sys.argv[2])

    queue_lock = Lock()
    counter_lock = Lock()
    write_lock = Lock()

    home_queue = Queue()
    market_queue = Queue()
    energy_exchange_queue = Queue()

    clock_ready = Event()
    weather_ready = Event()

    home_counter = Array('b', number_of_homes)

    for i in range(number_of_homes):
        home_counter[i] = False

    market_ready = Value('b', False)
    temperature = Array('f', 2)
    season = Array('f', 2)

    console_connection, market_connection = Pipe()
    console_connection_2, weather_connection = Pipe()

    homes = []

    for i in range(number_of_homes):
        renewable_energy = random.randint(0, 2)
        # 0 = normal home, 1 = solar, 2 = wind
        policy = random.randint(0, 2)
        h = Process(target = home, args = (number_of_homes, i, renewable_energy, policy, queue_lock, counter_lock, write_lock, home_queue, market_queue, energy_exchange_queue, home_counter, market_ready, temperature, season, weather_ready, clock_ready,))
        h.start()
        homes.append(h)
for h in homes:
    h.join()
print("Simulation duration :", round((time.time() - start), 3), "seconds")

В большинстве случаев это работает правильно и выводит правильное количество значений, но мне действительно нужно найти более надежный способ сделать это. В большинстве случаев этого недостаточно.

...