То, что вы просите, не совсем то, что должна обеспечивать обычная ОС. У вас есть мешающее планирование ОС, миграция ядра, разная тактовая частота с помощью термодинамики процессора, разные попадания и пропадания кэша и так далее. Можно повысить приоритеты процессов и привязать процессы к определенным ядрам (для этого посмотрите psutil ), но вряд ли вы сможете увидеть стабильные улучшения от этого. Ваша ОС обычно работает лучше, чем вы могли бы здесь.
Для действительно жестких ограничений в реальном времени вы должны изучить RTOS . Кроме того, вам придется выбрать язык среднего уровня (например, C / C ++), который позволяет тонко управлять памятью (сократить дорогостоящие промахи в процессоре-кеше). Возможно, вы все равно просите о том, что вы должны сделать по-другому ( XY проблема ), поэтому, когда я продолжаю показывать вам, как получить некоторую синхронизацию, не понимаю это как одобрение всего вашего подхода к любой проблеме, которую вы действительно пытаетесь решить здесь.
Оружие выбора здесь multiprocessing.Barrier
. Это примитив синхронизации, который позволяет указать количество исполнителей (потоков / процессов), которые должны вызывать .wait()
для барьер-экземпляра. Когда указанное число исполнителей вызвало wait()
, шлагбаум освобождает всех ожидающих исполнителей одновременно. Таким образом, все исполнители могут быть синхронизированы по такой барьерной операции.
Обратите внимание, что одной такой операции недостаточно для того, что вы просите. Факторы ОС, о которых я упоминал ранее, всегда возвращают хаос, и время процессора снова расходится с этой точки синхронизации. Это означает, что вам придется повторять синхронизацию через определенные интервалы снова и снова. Конечно, это будет стоить вам некоторой пропускной способности. Более короткие интервалы синхронизации означают меньшее расхождение в среднем.
Ниже вы видите две функции, реализующие эту технику. syncstart_foo
синхронизируется только один раз (как ответ @ blhsing), sync_foo
делает это каждые sync_interval
итерации. После выполнения всех итераций функции возвращают time.time()
родительскому элементу, где вычисляется дельта времени.
import time
from multiprocessing import Process, Barrier, Queue
def syncstart_foo(outqueue, barrier, n_iter):
barrier.wait() # synchronize only once at start
for _ in range(int(n_iter)):
pass # do stuff
outqueue.put(time.time())
def sync_foo(outqueue, barrier, n_iter, sync_interval):
for i in range(int(n_iter)):
if i % sync_interval == 0: # will sync first time for i==0
barrier.wait()
# do stuff
outqueue.put(time.time())
Вспомогательные функции для запуска теста:
def test_sync():
"""Run test for `sync_foo`."""
special_args = (SYNC_INTERVAL,)
_run_test(sync_foo, special_args)
def test_syncstart():
"""Run test for `syncstart_foo`."""
_run_test(syncstart_foo)
def _run_test(f, special_args=None):
outqueue = Queue()
barrier = Barrier(N_WORKERS)
args = (outqueue, barrier, N_ITER)
if special_args:
args += special_args
pool = [Process(target=f, args=args) for _ in range(N_WORKERS)]
print(f'starting test for {f.__name__}')
for p in pool:
p.start()
results = [outqueue.get() for _ in range(N_WORKERS)]
for p in pool:
p.join()
print(f"delta: {(abs(results[1] - results[0])) * 1e3:>{6}.{2}f} ms")
print("-" * 60)
Main-запись:
if __name__ == '__main__':
N_WORKERS = 2
N_ITER = 50e6 # 1e6 == 1M
SYNC_INTERVAL = 250_000 # synchronize every x iterations
for _ in range(5):
test_syncstart()
test_sync()
Пример вывода:
starting test for syncstart_foo
delta: 28.90 ms
------------------------------------------------------------
starting test for sync_foo
delta: 1.38 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 70.33 ms
------------------------------------------------------------
starting test for sync_foo
delta: 0.33 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 4.45 ms
------------------------------------------------------------
starting test for sync_foo
delta: 0.17 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 168.80 ms
------------------------------------------------------------
starting test for sync_foo
delta: 0.30 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 79.42 ms
------------------------------------------------------------
starting test for sync_foo
delta: 1.24 ms
------------------------------------------------------------
Process finished with exit code 0
Вы видите, что однократной синхронизации, как syncstart_foo
, недостаточно.