Как синхронизировать процессы Python на уровне миллисекунд? - PullRequest
4 голосов
/ 29 марта 2019

Я пытаюсь запустить две функции Python на двух ядрах точно одновременно. Каждый процесс запускает очень длинный цикл (теоретически бесконечный цикл). Важно, чтобы они оставались синхронизированными в одно и то же время, даже малейшая задержка может вызвать проблемы в долгосрочной перспективе.

Я думаю, что моя проблема в том, что я запускаю их последовательно, как это

# define the processes and assign them  functions
first_process = multiprocessing.Process(name='p1', target='first_function')
second_process = multiprocessing.Process(name='p2', target='second_function')

# start the processes
first_process.start()
second_process.start()

Я напечатал time.time() в начале каждой функции, чтобы измерить разницу во времени. Выходной результат составил:

first function time: 1553812298.9244068
second function time: 1553812298.9254067

разница составляет 0.0009999275207519531 секунды. Как упоминалось ранее, эта разница окажет значительное влияние на долгосрочную перспективу.

Подводя итог, как запустить две функции на двух разных ядрах в точно одновременно? Если Python не способен сделать это, какие еще варианты я должен проверить?

Ответы [ 2 ]

5 голосов
/ 29 марта 2019

Вы можете назначить объект multiprocessing.Queue для каждого из процессов, и в начале функции для процесса поместить элемент в очередь для другого процесса с помощью multiprocessing.Queue.put, а затем немедленно попытаться удалить его собственныйочередь с multiprocessing.Queue.get.Поскольку multiprocessing.Queue.get блокируется до тех пор, пока в очереди не появится элемент, это эффективно синхронизирует два процесса:

import multiprocessing
import time

def func(queue_self, queue_other):
    queue_other.put(None)
    queue_self.get()
    print(time.time())

q1 = multiprocessing.Queue()
q2 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=func, args=(q1, q2))
p2 = multiprocessing.Process(target=func, args=(q2, q1))

if __name__ == '__main__':
    p1.start()
    p2.start()

Пример вывода:

1553814412.7520192
1553814412.7520192
3 голосов
/ 29 марта 2019

То, что вы просите, не совсем то, что должна обеспечивать обычная ОС. У вас есть мешающее планирование ОС, миграция ядра, разная тактовая частота с помощью термодинамики процессора, разные попадания и пропадания кэша и так далее. Можно повысить приоритеты процессов и привязать процессы к определенным ядрам (для этого посмотрите psutil ), но вряд ли вы сможете увидеть стабильные улучшения от этого. Ваша ОС обычно работает лучше, чем вы могли бы здесь.

Для действительно жестких ограничений в реальном времени вы должны изучить RTOS . Кроме того, вам придется выбрать язык среднего уровня (например, C / C ++), который позволяет тонко управлять памятью (сократить дорогостоящие промахи в процессоре-кеше). Возможно, вы все равно просите о том, что вы должны сделать по-другому ( XY проблема ), поэтому, когда я продолжаю показывать вам, как получить некоторую синхронизацию, не понимаю это как одобрение всего вашего подхода к любой проблеме, которую вы действительно пытаетесь решить здесь.


Оружие выбора здесь multiprocessing.Barrier. Это примитив синхронизации, который позволяет указать количество исполнителей (потоков / процессов), которые должны вызывать .wait() для барьер-экземпляра. Когда указанное число исполнителей вызвало wait(), шлагбаум освобождает всех ожидающих исполнителей одновременно. Таким образом, все исполнители могут быть синхронизированы по такой барьерной операции.

Обратите внимание, что одной такой операции недостаточно для того, что вы просите. Факторы ОС, о которых я упоминал ранее, всегда возвращают хаос, и время процессора снова расходится с этой точки синхронизации. Это означает, что вам придется повторять синхронизацию через определенные интервалы снова и снова. Конечно, это будет стоить вам некоторой пропускной способности. Более короткие интервалы синхронизации означают меньшее расхождение в среднем.

Ниже вы видите две функции, реализующие эту технику. syncstart_foo синхронизируется только один раз (как ответ @ blhsing), sync_foo делает это каждые sync_interval итерации. После выполнения всех итераций функции возвращают time.time() родительскому элементу, где вычисляется дельта времени.

import time
from multiprocessing import Process, Barrier, Queue


def syncstart_foo(outqueue, barrier, n_iter):
    barrier.wait() # synchronize only once at start
    for _ in range(int(n_iter)):
        pass # do stuff
    outqueue.put(time.time())


def sync_foo(outqueue, barrier, n_iter, sync_interval):
    for i in range(int(n_iter)):
        if i % sync_interval == 0: # will sync first time for i==0
            barrier.wait()
        # do stuff
    outqueue.put(time.time())

Вспомогательные функции для запуска теста:

def test_sync():
    """Run test for `sync_foo`."""
    special_args = (SYNC_INTERVAL,)
    _run_test(sync_foo, special_args)


def test_syncstart():
    """Run test for `syncstart_foo`."""
    _run_test(syncstart_foo)


def _run_test(f, special_args=None):

    outqueue = Queue()
    barrier = Barrier(N_WORKERS)

    args = (outqueue, barrier, N_ITER)
    if special_args:
        args += special_args

    pool = [Process(target=f, args=args) for _ in range(N_WORKERS)]

    print(f'starting test for {f.__name__}')
    for p in pool:
        p.start()

    results = [outqueue.get() for _ in range(N_WORKERS)]

    for p in pool:
        p.join()

    print(f"delta: {(abs(results[1] - results[0])) * 1e3:>{6}.{2}f} ms")
    print("-" * 60)

Main-запись:

if __name__ == '__main__':

    N_WORKERS = 2
    N_ITER = 50e6  # 1e6 == 1M
    SYNC_INTERVAL = 250_000  # synchronize every x iterations

    for _ in range(5):
        test_syncstart()
        test_sync()

Пример вывода:

starting test for syncstart_foo
delta:  28.90 ms
------------------------------------------------------------
starting test for sync_foo
delta:   1.38 ms
------------------------------------------------------------
starting test for syncstart_foo
delta:  70.33 ms
------------------------------------------------------------
starting test for sync_foo
delta:   0.33 ms
------------------------------------------------------------
starting test for syncstart_foo
delta:   4.45 ms
------------------------------------------------------------
starting test for sync_foo
delta:   0.17 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 168.80 ms
------------------------------------------------------------
starting test for sync_foo
delta:   0.30 ms
------------------------------------------------------------
starting test for syncstart_foo
delta:  79.42 ms
------------------------------------------------------------
starting test for sync_foo
delta:   1.24 ms
------------------------------------------------------------

Process finished with exit code 0

Вы видите, что однократной синхронизации, как syncstart_foo, недостаточно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...