задержка, вызванная толстым аргументом в многопроцессорной обработке Python - PullRequest
0 голосов
/ 31 мая 2019

Я использую многопроцессорность в Python для параллелизма некоторых вычислительно-тяжелых функций.но я обнаружил, что при создании толстого аргумента существует задержка в процессе создания (например, график networkx из 1000 нот или список из 1000000 элементов).Я экспериментирую на двух многопроцессорных модулях «multiprocessing» и «pathos», получаю схожие результаты.Мой вопрос заключается в том, как избежать такой задержки, потому что она разрушает преимущества параллельных вычислений.

в моем примере кода, я просто передаю толстый аргумент функции для многопроцессорной обработки - тело функции не касаетсяАргумент как все.

  1. пример кода с использованием «многопроцессорной обработки»
import multiprocessing
import time

def f(args):
    (x, conn, t0, graph) = args
    ans = 1
    x0 = x
    t = time.time() - t0
    conn.send('factorial of %d: start@%.2fs' % (x0, t))
    while x > 1:
        ans *= x
        time.sleep(0.5)
        x -= 1
    t = time.time() - t0
    conn.send('factorial of %d: finish@%.2fs, res = %d' %(x0, t, ans))
    return ans

def main():
    var = (4, 8, 12, 20, 16)
    p = multiprocessing.Pool(processes = 4)
    p_conn, c_conn = multiprocessing.Pipe()
    params = []
    t0 = time.time()
    N = 1000
    import networkx as nx
    G = nx.complete_graph(N, nx.DiGraph())

    import random
    for (start, end) in G.edges:
        G.edges[start, end]['weight'] = random.random()

    for i in var:
        params.append((i, c_conn, t0, G))
    res = list(p.imap(f, params))
    p.close()
    p.join()

    print('output:')
    while p_conn.poll():
        print(p_conn.recv())
    t = time.time() - t0
    print('factorial of %s@%.2fs: %s' % (var, t, res))

if __name__ == '__main__':
    main()

вывод приведенного выше примера кода

output:
factorial of 4: start@29.78s
factorial of 4: finish@31.29s, res = 24
factorial of 8: start@53.56s
factorial of 8: finish@57.07s, res = 40320
factorial of 12: start@77.25s
factorial of 12: finish@82.75s, res = 479001600
factorial of 20: start@100.39s
factorial of 20: finish@109.91s, res = 2432902008176640000
factorial of 16: start@123.55s
factorial of 16: finish@131.05s, res = 20922789888000
factorial of (4, 8, 12, 20, 16)@131.06s: [24, 40320, 479001600, 2432902008176640000, 20922789888000]

Process finished with exit code 0

в соответствии свышеупомянутый вывод, есть приблизительно 24-секундные задержки между двумя процессами, создающими

пример кода с использованием «пафоса»
import pathos
import multiprocess
import time

def f(x, conn, t0, graph):
    ans = 1
    x0 = x
    t = time.time() - t0
    conn.send('factorial of %d: start@%.2fs' % (x0, t))
    while x > 1:
        ans *= x
        time.sleep(0.5)
        x -= 1
    t = time.time() - t0
    conn.send('factorial of %d: finish@%.2fs, res = %d' %(x0, t, ans))
    return ans

def main():
    var = (4, 8, 12, 20, 16)
    p = pathos.multiprocessing.ProcessPool(nodes=4)
    p_conn, c_conn = multiprocess.Pipe()
    t0 = time.time()
    conn_s = [c_conn] * len(var)
    t0_s = [t0] * len(var)
    N = 1000
    import networkx as nx
    G = nx.complete_graph(N, nx.DiGraph())

    import random
    for (start, end) in G.edges:
        G.edges[start, end]['weight'] = random.random()

    res = list(p.imap(f, var, conn_s, t0_s, [G] * len(var)))

    print('output:')
    while p_conn.poll():
        print(p_conn.recv())
    t = time.time() - t0
    print('factorial of %s@%.2fs: %s' % (var, t, res))

if __name__ == '__main__':
    main()

вывод приведенного выше примера кода,

output:
factorial of 4: start@29.63s
factorial of 4: finish@31.13s, res = 24
factorial of 8: start@53.50s
factorial of 8: finish@57.00s, res = 40320
factorial of 12: start@76.94s
factorial of 12: finish@82.44s, res = 479001600
factorial of 20: start@100.72s
factorial of 20: finish@110.23s, res = 2432902008176640000
factorial of 16: start@123.69s
factorial of 16: finish@131.20s, res = 20922789888000
factorial of (4, 8, 12, 20, 16)@131.20s: [24, 40320, 479001600, 2432902008176640000, 20922789888000]

Process finished with exit code 0

аналогично, согласно приведенному выше выводу, существует около24-секундная задержка между созданием двух процессов.

Если я уменьшу размер графика (меньшее число узлов), задержка соответственно уменьшится.Я предполагаю, что это связано с дополнительным временем, используемым для травления / добавления графа networkx в качестве аргумента.в идеале, первые 4 процесса должны быть созданы одновременно.как избежать этой стоимости?спасибо!


ОБНОВЛЕНИЕ

Благодаря любезному ответу Александра я удалил канал в кодах "multiprocessing" и "pathos".«многопроцессорный» код работает как Александр - задержка уменьшена до 1 секунды, но «пафосный» код все еще имеет задержку более 20 секунд.исправленный «пафосный» код выложен ниже:

import pathos
import multiprocess
import time
from pympler import asizeof
import sys



def f(args):
    (x, graph) = args
    t = time.ctime()
    print('factorial of %d: start@%s' % (x, t))
    time.sleep(4)
    return x


def main():
    t0 = time.time()
    params = []

    var = (4, 8, 12, 20, 16)
    p = pathos.multiprocessing.ProcessPool(nodes=4)
    N = 1000
    import networkx as nx
    G = nx.complete_graph(N, nx.DiGraph())

    import random
    for (start, end) in G.edges:
        G.edges[start, end]['weight'] = random.random()

    print('Size of G by sys', sys.getsizeof(G), 'asizeof', asizeof.asizeof(G))
    print('G created in %.2f' %  (time.time() - t0))

    for i in var:
        params.append((i, G))
    res = list(p.imap(f, params))
    p.close()
    p.join()

if __name__ == '__main__':
    main()

вывод идет как

Size of G by sys 56 asizeof 338079824
G created in 17.36
factorial of 4: start@Fri May 31 11:39:26 2019
factorial of 8: start@Fri May 31 11:39:53 2019
factorial of 12: start@Fri May 31 11:40:19 2019
factorial of 20: start@Fri May 31 11:40:44 2019
factorial of 16: start@Fri May 31 11:41:10 2019

Process finished with exit code 0

Ответы [ 2 ]

1 голос
/ 31 мая 2019

Этот толстый аргумент (338 МБ) должен быть скопирован в отдельную память при создании каждого процесса, но это не должно занять столько времени (24 секунды).

Вот как это работает на моем компьютере:

  • Программа зависает в conn.send. Проблема с кодом (1.) в multiprocess.Pipe (). From https://docs.python.org/3.4/library/multiprocessing.html?highlight=process "... Обратите внимание, что данные в канале могут быть повреждены, если два процесса (или потоки) попытаются выполнять чтение или запись в один и тот же конец канала одновременно."

Итак, я изменил код:

import multiprocessing
import os
import time
import sys
from pympler import asizeof
import networkx as nx
import random

def factorial(args):
    (x, t, graph) = args
    s0 = '# pid %s x %2d' % (format(os.getpid()), x)
    s1 = 'started @ %.2f' % (time.time() - t)
    print(s0, s1)
    f = 1
    while x > 1:
        f *= x
        x -= 1
        time.sleep(0.5)
    s2 = 'ended   @ %.2f' % (time.time() - t)
    print(s0, s2, f)
    return s0, s1, s2, f

if __name__ == '__main__':
    t0 = time.time()
    N = 1000
    G = nx.complete_graph(N, nx.DiGraph())
    for (start, end) in G.edges:
        G.edges[start, end]['weight'] = random.random()
    print('Size of G by sys', sys.getsizeof(G), 'asizeof', asizeof.asizeof(G))
    print('G created in %.2f' %  (time.time() - t0))
    t0 = time.time()
    p = multiprocessing.Pool(processes=4)
    outputs = list(p.imap(factorial, [(i, t0, G) for i in (4, 8, 12, 20, 16)]))
    print('output:')
    for output in outputs:
        print(output)

Вывести сейчас:

Size of G by sys 56 asizeof 338079824
G created in 13.03
# pid 2266 x  4 started @ 1.27
# pid 2267 x  8 started @ 1.98
# pid 2268 x 12 started @ 2.72
# pid 2266 x  4 ended   @ 2.77 24
# pid 2269 x 20 started @ 3.44
# pid 2266 x 16 started @ 4.09
# pid 2267 x  8 ended   @ 5.49 40320
# pid 2268 x 12 ended   @ 8.23 479001600
# pid 2266 x 16 ended   @ 11.60 20922789888000
# pid 2269 x 20 ended   @ 12.95 2432902008176640000
output:
('# pid 2266 x  4', 'started @ 1.27', 'ended   @ 2.77', 24)
('# pid 2267 x  8', 'started @ 1.98', 'ended   @ 5.49', 40320)
('# pid 2268 x 12', 'started @ 2.72', 'ended   @ 8.23', 479001600)
('# pid 2269 x 20', 'started @ 3.44', 'ended   @ 12.95', 2432902008176640000)
('# pid 2266 x 16', 'started @ 4.09', 'ended   @ 11.60', 20922789888000)

338 МБ данных создаются за 11 секунд, и, да, для запуска первых 4 процессов требуется время. Задержки между запусками гораздо меньше: 0,71, 0,74, 0,72 секунды. У меня iMac с Intel i5 @ 3,2 ГГц.

Наибольшее значение N, когда нет видимой задержки, равно 78:

Size of G by sys 56 asizeof 1970464
G created in 0.08
# pid 2242 x  4 started @ 0.01
# pid 2243 x  8 started @ 0.01
# pid 2244 x 12 started @ 0.01
# pid 2245 x 20 started @ 0.01
# pid 2242 x  4 ended   @ 1.51 24
# pid 2242 x 16 started @ 1.53
# pid 2243 x  8 ended   @ 3.52 40320
# pid 2244 x 12 ended   @ 5.52 479001600
# pid 2242 x 16 ended   @ 9.04 20922789888000
# pid 2245 x 20 ended   @ 9.53 2432902008176640000
output:
('# pid 2242 x  4', 'started @ 0.01', 'ended   @ 1.51', 24)
('# pid 2243 x  8', 'started @ 0.01', 'ended   @ 3.52', 40320)
('# pid 2244 x 12', 'started @ 0.01', 'ended   @ 5.52', 479001600)
('# pid 2245 x 20', 'started @ 0.01', 'ended   @ 9.53', 2432902008176640000)
('# pid 2242 x 16', 'started @ 1.53', 'ended   @ 9.04', 20922789888000)
0 голосов
/ 01 июня 2019

Я изменил N на 50 и запустил «пафосный» код с помощью отладчика в PyCharm.Остановился после «G создан в 7.79».Вывод ниже подтвердил мое подозрение о том, почему он медленнее с «пафосом».Pathos использует объекты подключения и сокета (в зависимости от платформы) для передачи аргументов и запуска подпроцесса.Вот почему это намного медленнее: примерно в 30 раз.С другой стороны: он работает по сети.

Отладочный вывод:

/usr/local/bin/python3.7 "/Applications/PyCharm CE.app/Contents/helpers/pydev/pydevd.py" --multiproc --qt-support=auto --client 127.0.0.1 --port 51876 --file /Users/alex/PycharmProjects/game/object_type.py
pydev debugger: process 1526 is connecting

Connected to pydev debugger (build 191.6605.12)
Size of G by sys 56 asizeof 57126904
G created in 7.79
Process ForkPoolWorker-3:
Process ForkPoolWorker-2:
Process ForkPoolWorker-1:
Process ForkPoolWorker-4:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 297, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 297, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/pool.py", line 110, in worker
    task = get()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/queues.py", line 354, in get
    with self._rlock:
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/pool.py", line 110, in worker
    task = get()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/synchronize.py", line 102, in __enter__
    return self._semlock.__enter__()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/queues.py", line 354, in get
    with self._rlock:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/synchronize.py", line 102, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 297, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/pool.py", line 110, in worker
    task = get()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/queues.py", line 355, in get
    res = self._reader.recv_bytes()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/connection.py", line 219, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/connection.py", line 410, in _recv_bytes
    buf = self._recv(4)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/connection.py", line 382, in _recv
    chunk = read(handle, remaining)
Traceback (most recent call last):
KeyboardInterrupt
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 297, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/pool.py", line 110, in worker
    task = get()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/queues.py", line 354, in get
    with self._rlock:
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/synchronize.py", line 102, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/pool.py", line 733, in next
    item = self._items.popleft()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Applications/PyCharm CE.app/Contents/helpers/pydev/pydevd.py", line 1741, in <module>
    main()
  File "/Applications/PyCharm CE.app/Contents/helpers/pydev/pydevd.py", line 1735, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "/Applications/PyCharm CE.app/Contents/helpers/pydev/pydevd.py", line 1135, in run
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "/Applications/PyCharm CE.app/Contents/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/alex/PycharmProjects/game/object_type.py", line 100, in <module>
    outputs = list(p.imap(factorial, [(i, t0, G) for i in (4, 8, 12, 20, 16)]))
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/pool.py", line 737, in next
    self._cond.wait(timeout)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 296, in wait
    waiter.acquire()
KeyboardInterrupt
...