Синхронизация синхронизированных параллельных процессов на разных частотах - PullRequest
0 голосов
/ 02 февраля 2020

Я пытаюсь смоделировать различные задачи, выполняющиеся параллельно в Python, и каждый параллельный процесс выполняется на разных частотах (например, 200 Гц, 100 Гц и 50 Гц). Я использовал код из этого вопроса , чтобы создать класс Timer для запуска этих процессов в режиме реального времени, но процессы не синхронизируются во времени (например, три задачи по 200 Гц могут иногда выполняться между двумя 100 Хз задачи).

Для синхронизации своих процессов я делаю счетчики тиков в их общей памяти. Каждая итерация процесса 200 Гц увеличивает счетчик, а затем ожидает его сброса на 0, когда счетчик достигает 2, в то время как каждая итерация процесса 100 Гц ожидает, что этот счетчик достигнет 2, прежде чем его сбросить. То же самое для процесса 50 Гц, но с другим счетчиком. Я использую метод while / pass для ожидания.

Вот код:

from multiprocessing import Process, Event, Value
import time

# Add Timer class for multiprocessing
class Timer(Process):
    def __init__(self, interval, iteration, function, args=[], kwargs={}):
        super(Timer, self).__init__()
        self.interval = interval
        self.iteration = iteration
        self.iterationLeft = self.iteration
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.finished = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet"""
        self.finished.set()

    def run(self):
        startTimeProcess = time.perf_counter()
        while self.iterationLeft > 0:
            startTimePeriod = time.perf_counter()
            self.function(*self.args, **self.kwargs)
            # print(self.interval-(time.clock() - startTimePeriod))
            self.finished.wait(self.interval-(time.perf_counter() - startTimePeriod))
            self.iterationLeft -= 1
        print(f'Process finished in {round(time.perf_counter()-startTimeProcess, 5)} seconds')


def func0(id, freq, tick_p1):
    # Wait for 2 runs of Process 1 (tick_p1)
    while tick_p1.value < 2:
        pass
    tick_p1.value = 0   # Reset tick_p1

    # Add fake computational time depending on the frequency of the process
    print(f'id: {id} at {freq} Hz') 
    if freq == 400:
        time.sleep(0.002)
    elif freq == 200:
        time.sleep(0.003)
    elif freq == 100:
        time.sleep(0.007)
    elif freq == 50:
        time.sleep(0.015)



def func1(id, freq, tick_p1, tick_p2):
    # Wait for tick_p1 to have been reset by Process0
    while tick_p1.value >= 2:
        pass
    # Wait for 2 runs of Process 2 (tick_p2)
    while tick_p2.value < 2:
        pass
    tick_p2.value = 0   # Reset tick_p2

    # Add fake computational time depending on the frequency of the process
    print(f'id: {id} at {freq} Hz') 
    if freq == 400:
        time.sleep(0.002)
    elif freq == 200:
        time.sleep(0.003)
    elif freq == 100:
        time.sleep(0.007)
    elif freq == 50:
        time.sleep(0.015)

    # Increment tick_p1
    tick_p1.value += 1


def func2(id, freq, tick_p2):
    # Wait for tick_p2 to have been reset by Process1
    while tick_p2.value >= 2:
        pass

    # Add fake computational time depending on the frequency of the process
    print(f'id: {id} at {freq} Hz') 
    if freq == 400:
        time.sleep(0.002)
    elif freq == 200:
        time.sleep(0.003)
    elif freq == 100:
        time.sleep(0.007)
    elif freq == 50:
        time.sleep(0.015)

    # Increment tick_p2
    tick_p2.value += 1



if __name__ == '__main__':
    freqs = [50,100,200]
    # freqs = [0.25,0.5,1]
    Tf = 10

    tick_p1 = Value('i', 1)
    tick_p2 = Value('i', 1)  

    processes = []
    p0 = Timer(interval=1/freqs[0], iteration=round(Tf*freqs[0]), function = func0, args=(0,freqs[0], tick_p1))
    p1 = Timer(interval=1/freqs[1], iteration=round(Tf*freqs[1]), function = func1, args=(1,freqs[1], tick_p1, tick_p2))
    p2 = Timer(interval=1/freqs[2], iteration=round(Tf*freqs[2]), function = func2, args=(2,freqs[2], tick_p2))
    processes.append(p0)
    processes.append(p1)
    processes.append(p2)

    start = time.perf_counter()
    for process in processes:
        process.start()   

    for process in processes:
        process.join()

    finish = time.perf_counter()

    print(f'Finished in {round(finish-start, 5)} seconds')

Как вы можете видеть, я добавил время ожидания в процессах, чтобы смоделировать время вычислений , Когда я удаляю команды печати в процессах, сценарию требуется 10,2 секунды времени выполнения для имитации 10 секунд вычислений в режиме реального времени (увеличение на 2%, что приемлемо).

Мой вопрос: это лучший способ добиться того, что я пытаюсь сделать? Есть ли лучший / более быстрый способ?

Спасибо!

1 Ответ

0 голосов
/ 04 февраля 2020

Я нашел более чистый способ сделать это, но я все еще открыт для других предложений.

По сути, вместо того, чтобы ждать момент для выполнения следующей итерации, я создал планировщик, который указывает, когда нужно запускать цикл для каждого процесса (с использованием общих значений).

Это быстрые процессы (скажем, p2, работающие с частотой 400 Гц), и все остальные процессы должны быть медленнее кратного частоты ( скажем p1 и p0 200 и 100 Гц).

Вместо ожидания подходящего момента для поднятия флага выполнения (с wait() или sleep()), планировщик зацикливается с while l oop и проверяет, закончился ли период p2. Если условие выполнено, оно вызывает p2Flag и перезапускает период. Каждый процесс имеет свой собственный флаг, и флаги более медленных процессов поднимаются в соответствии со счетчиком, который увеличивается каждый период p2. Если с момента последнего вызова p1 было выполнено 2 шага p2, этот планировщик будет «ждать» завершения p1, прежде чем поднять флаг p2 и p1.

Это немного сложно, но это гарантирует, что более медленные машины будут получать те же результаты, что и машины, которые могут запускать это в режиме реального времени.

from multiprocessing import Process, Value
import time

def func0(id, freq, endFlag, p0Flag, runIdx, Ts):
    while (endFlag.value == 0):
        if (p0Flag.value == 1):
            t = round(runIdx.value*Ts, 4)

            # Add fake computational time depending on the frequency of the process
            # print(f'id: {id} at {freq} Hz at {t}s') 
            if freq == 400:
                time.sleep(0.002)
            elif freq == 200:
                time.sleep(0.003)
            elif freq == 100:
                time.sleep(0.007)
            elif freq == 50:
                time.sleep(0.015)

            # Lower flag to confirm completion of cycle
            p0Flag.value = 0


def func1(id, freq, endFlag, p1Flag, runIdx, Ts):
    while (endFlag.value == 0):
        if (p1Flag.value == 1):
            t = round(runIdx.value*Ts, 4)

            # Add fake computational time depending on the frequency of the process
            # print(f'id: {id} at {freq} Hz at {t}s') 
            if freq == 400:
                time.sleep(0.002)
            elif freq == 200:
                time.sleep(0.003)
            elif freq == 100:
                time.sleep(0.007)
            elif freq == 50:
                time.sleep(0.015)

            # Lower flag to confirm completion of cycle
            p1Flag.value = 0


def func2(id, freq, endFlag, p2Flag, runIdx, Ts):
    while (endFlag.value == 0):
        if (p2Flag.value == 1):
            t = round(runIdx.value*Ts, 4)

            # Add fake computational time depending on the frequency of the process
            # print(f'id: {id} at {freq} Hz at {t}s') 
            if freq == 500:
                time.sleep(0.0015)
            elif freq == 400:
                time.sleep(0.002)
            elif freq == 200:
                time.sleep(0.003)
            elif freq == 100:
                time.sleep(0.007)
            elif freq == 50:
                time.sleep(0.015)

            # Update time for next iteration
            runIdx.value += 1
            # Lower flag to confirm completion of cycle
            p2Flag.value = 0



if __name__ == '__main__':
    # Set frequencies of processes
    # Last value of freqs is the fastest one, for process p2
    freqs = [50,100,200]    # Hz
    freqs = [100,200,400]   # Hz
    # freqs = [0.25,0.5,1]  # Hz
    Tf = 10
    Ts = round(1/freqs[-1], 4)

    # Create shared values for time index (runIdx)
    # Various flags to trigger the execution of the code in each process (p0Flag, ...)
    # A flag to en all processes
    runIdx = Value('I',0)
    p0Flag = Value('b', 0)
    p1Flag = Value('b', 0)
    p2Flag = Value('b', 0)
    endFlag = Value('b', 0)

    # How many times the fastest process has to run before flagging the slower processes
    p0_counter_exe = freqs[-1]/freqs[0]
    p1_counter_exe = freqs[-1]/freqs[1]

    if (not(freqs[-1] % freqs[0] == 0) or not(freqs[-1] % freqs[1] == 0)):
        raise Exception("Update rates for processes must be a multiple of the dynamic's update rate.")
    if (freqs[-1] < freqs[0]) or (freqs[-1] < freqs[1]):
        raise Exception("Dynamics update rate must be the fastest.")

    # p2 is at fastest frequency, p1 and p0 at lower frequencies
    p0 = Process(target=func0, args=(0, freqs[0], endFlag, p0Flag, runIdx, Ts))
    p1 = Process(target=func1, args=(1, freqs[1], endFlag, p1Flag, runIdx, Ts))
    p2 = Process(target=func2, args=(2, freqs[2], endFlag, p2Flag, runIdx, Ts))
    processes = []
    processes.append(p0)
    processes.append(p1)
    processes.append(p2)

    for process in processes:
        process.start()   
    time.sleep(0.5)

    # Start subprocesse's counters to execute directly at the first timestep
    p0_counter = p0_counter_exe
    p1_counter = p1_counter_exe

    # Scheduler
    #------------
    startTime  = time.perf_counter()
    periodEnd = time.perf_counter()
    while (runIdx.value*Ts < Tf):
        periodTime = time.perf_counter()-periodEnd
        do_p2 = False

        # Wait for new timestep AND completion of p2
        if (periodTime >= Ts and p2Flag.value == 0):

            # If p0 or p1 are expected to finish before the new timestep, wait for their completion
            # Depending on the situation, if slower processes have finished their cycle, make do_p2 True
            if (p1_counter == p1_counter_exe) and (p0_counter == p0_counter_exe):
                if (p1Flag.value == 0) and (p0Flag.value == 0):
                    do_p2 = True
            elif (p1_counter == p1_counter_exe):
                if (p1Flag.value == 0):
                    do_p2 = True
            elif (p0_counter == p0_counter_exe):
                if (p0Flag.value == 0):
                    do_p2 = True
            else:
                do_p2 = 1

            # If do_p2 is True, raise p2Flag for the p2 process
            if (do_p2):
                periodEnd = time.perf_counter()
                p2Flag.value = 1

                # If it's time to start a cycle for the slower processes, raise their flag and reset their counter
                if (p1_counter == p1_counter_exe):
                    p1Flag.value = 1
                    p1_counter = 0 
                if (p0_counter == p0_counter_exe):
                    p0Flag.value = 1
                    p0_counter = 0

                # Increment slower processes counter
                p1_counter += 1
                p0_counter += 1


    # Close all processes
    endFlag.value = 1

    for process in processes:
        process.join()

    print(f'Finished in {round(time.perf_counter()-startTime, 5)} seconds')
    print(Ts*runIdx.value)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...