Python3.7 pool.apply_async, похоже, не работает для меня - PullRequest
0 голосов
/ 18 октября 2018

Итак, я пытаюсь запустить 6 процессов одновременно, в качестве теста (у меня 128-ядерный ЦП, поэтому цель - 127 процессов параллельно), и в каждом процессе я буду запускать 256 потоков для выполнения какой-либо задачи.

Я думаю, что неправильно понимаю вызов pool.apply_async, поэтому, кажется, ничего не происходит, как только вызовы проходят.Я слежу за примерами, показанными в https://docs.python.org/3/library/multiprocessing.html#using-a-pool-of-workers, и я не понимаю, какую ошибку я совершаю.

Это фрагмент кода, который выполняет асинхронный вызов

batch_no = 0
ra = []
for worker_ip in worker_ip_list:
    logg.log("debug","attempting to do async process invocation for workload batch ="+str(batch_no))
    r = worker_pool.apply_async(self.run_worker_for_multi_task,(target_function,worker_ip,threads_per_worker,))
    ra.append(r)
    try:
        logg.log("debug","work pool async call ready status ="+str(r.successful()))
    except Exception:
        logg.log_stacktrace()
    batch_no = batch_no + 1

В начале self.run_worker_for_multi_task есть несколько операторов журнала, но я не вижу, чтобы они выполнялись.

Вот начало метода.

    def run_worker_for_multi_task(self,tf,worker_ip_list,thread_batch_size):
        l = self.logger.log
        worker_output = Queue()
        l("info","started worker process with PID="+str(os.getpid()))
        l("info","thread batch size is = "+str(thread_batch_size))
        l("debug","creating thread batches...")
...

Но это вывод, который я получаю.

Thu Oct 18 15:38:22 2018 -- INFO -- [directory watcher] directory watching running a scan cycle.
Thu Oct 18 15:38:23 2018 -- DEBUG -- Process Tracker Initialized
Thu Oct 18 15:38:23 2018 -- DEBUG -- [process tracker] {'app_pid': 36935}
Thu Oct 18 15:38:23 2018 -- INFO -- number of workers set to 6
Thu Oct 18 15:38:23 2018 -- INFO -- number of threads per worker set to 256
Thu Oct 18 15:38:23 2018 -- DEBUG -- workload size is - 134208
Thu Oct 18 15:38:23 2018 -- DEBUG -- workload size per worker is going to be - 22368
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =0
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb7bda0> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =1
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb7bdd8> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =2
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb7be48> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =3
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb8b6a0> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =4
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb8b710> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =5
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb8b7b8> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- all workers completed. shared output data returned by all workers is --
Thu Oct 18 15:38:23 2018 -- DEBUG -- {}

Параллельно я выполняю команду ps -ef | grep -i python в непрерывном цикле, но я не вижу никакого увеличения процессов Python при выполнении кода.

И я знаю, что self.run_worker_for_multi_task работает хорошо, потому что я смог получить ожидаемое поведение от него, когда я вызывал его с помощью вызова Process.start().Проблема с Process.start() заключается в том, что он блокирует и предотвращает запуск других процессов до тех пор, пока процесс не присоединится.

т.е. следующий фрагмент кода не выполняет параллельный список процессов.Он блокируется при первом process.start() звонке

logg.log("debug","creating workers...")
for worker_ip in worker_ip_list:
    worker_inst = Process(target=self.__run_worker_for_multi_task,args=(target_function,worker_ip,q,threads_per_worker,))
    worker_list.append(worker_inst)
logg.log("debug","workers created.")
logg.log("debug","starting workers.")
for worker_inst in worker_list:
    worker_inst.start()
    logg.log("info","starting worker "+str(worker_inst) +" with pid="+str(worker_inst.pid))
logg.log("debug","workers are started")
logg.log("debug","waiting for all workers to complete their tasks")
for worker_inst in worker_list:
    worker_inst.join()

Что мне здесь не хватает?Почему я не вижу, как запускаются шесть процессов, и не вижу записи журнала от цели?Как запустить функцию в нескольких процессах параллельно?

1 Ответ

0 голосов
/ 19 октября 2018

Так что я до сих пор не знаю, что происходит с Async в пуле.Но я понял, почему процесс заблокирован, а Process.start() не работает.цель должна быть в общем доступе, иначе контекст не может быть передан другим процессам.Так что смена цели на публичный метод сделала свое дело.

...