Как сделать блок рабочего процесса в ожидании многопроцессорной обработки Value или Array? - PullRequest
0 голосов
/ 25 сентября 2018

В этом документе показан пример совместного использования состояния между процессами, использующими Value и Array из библиотеки multiprocessing:

из многопроцессорного импорта Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
    a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

Будет напечатано

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Мои вопросы

  1. Как вы продолжите передавать информацию другому процессу, а не во время созданиярабочий процесс?

  2. Как можно заставить рабочий процесс блокировать (или приостанавливать) ожидание события от родительского процесса через этот механизм?

Моя платформа - Windows 10. Общая память может быть разделена между процессами, но процессы fork () или spawn () не могут наследовать семафор, блокировку, очередь и т. Д.

Спасибо.

[Обновление 1]

Демонстрация @ Manu-Valdés работает.Но я сделал пример не работает, возможно, вы могли бы помочь определить проблему.

%%file ./examples/multiprocessing_pool5.py
# This code definitely will not work in Windows as queue object is not fork() along.
import multiprocessing
import os

def f1(q):
  x = q.get(True) # Block until something is in the queue
  if x == 55:
    raise Exception('I do not like 55!')
  elif x == 100:
    return
  else:
    print(f'f1({x}) -> {x*x}')


def f2(q):
  x = q.get(True) # Block until something is in the queue
  if x == 55:
    raise Exception('I do not like 55!')
  elif x == 100:
    return
  else:
    print(f'f2({x}) -> {x*x}')


def wp_init(q):
  #global queue
  #queue = q  # Point to the global queue in each process
  print(f'I am initialized')


def success_cb(result):
  print(f'Success returns = {result}')


def failure_cb(result):
  print(f'Failure returns = {result}')


if __name__ == '__main__':
  np = os.cpu_count()  # Number of cores per CPU
  queue = multiprocessing.Queue()
  pool = multiprocessing.Pool(np, initializer=wp_init, initargs=(queue,))

  for x in range(100):
    if x % 2 == 0:
      f = f1
    else:
      f = f2
  pool.apply_async(f, args=(queue,), callback=success_cb, error_callback=failure_cb)

  for x in range(100):
    queue.put(x)

  # Terminate them but I do not know how to loop through the processes
  for _ in range(100):
    queue.put(100)  # Terminate it

  pool.close()
  pool.join()

Ошибка

I am initialized
I am initialized
I am initialized
I am initialized
Failure returns = Queue objects should only be shared between processes through inheritance

Ответы [ 2 ]

0 голосов
/ 26 сентября 2018

Позвольте мне ответить на мой собственный вопрос.Ниже приведены некоторые из моих пониманий:

а) apply_async() возвращается немедленно.Я использую multiprocessing.Manager() при создании Queue, Value и Array, чтобы избежать ошибки Synchronized objects should only be shared between processes through inheritance или xxx objects should only be shared between processes through inheritance.

b) Использоватьmultiprocessing.Queue для сигнализации, остановки, завершения рабочих процессов из родительского процесса.

c) Невозможно передавать разные сообщения для разных рабочих процессов, ожидающих в одной и той же очереди.Вместо этого используйте разные очереди.

d) Pool.apply_async() позволяет только основной функции входа для рабочего процесса принимать один аргумент.В этом случае поместите аргументы в список ([]).

e) Мы могли бы использовать multiprocessing.sharedctypes.RawValue(), multiprocessing.sharedctypes.RawArray(), multiprocessing.sharedctypes.Value() и Array multiprocessing.sharedctypes.Array() для создания значения ctypes,массив ctypes, значение ctypes с необязательной блокировкой и массив ctypes с необязательными блокировками в общей памяти.Совместно используемые объекты могут передаваться рабочим процессам через аргументы ключевых слов initializer и initargs при создании объекта Pool с использованием multiprocessing.Pool().Эти совместно используемые объекты не могут быть переданы с использованием методов Pool.apply_async() или Pool.map().

f) Стандартная документация Python для многопроцессорной обработки нуждается в обновлении.Например,

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) должно быть записано как class multiprocessing.pool.Pool([processes [, initializer=None [, initargs=None [, maxtaskperchild=None [, context=None]]]]])

import multiprocessing as mp
import time

# Worker process 1
def f1(q):
  while True:
    x = queue.get(True) # Block until there is message
    if x >= 20:
      raise Exception(f'f1: I do not like {x}!')
    elif x == -1:
      print(f'f1: Quit')
      return "f1"
    else:
      time.sleep(0.5)
      v = q[0]
      a = q[1]
      print(f'f1({x}, {v}, {a})')


# Worker process 2
def f2(q):
  while True:
    x = queue.get(True) # Block until there is message
    if x >= 20:
      raise Exception(f'f2: I do not like {x}!')
    elif x == -1:
      print(f'f2: Quit')
      return "f2"
    else:
      time.sleep(0.5)
      v = q[0]
      a = q[1]
      print(f'f1({x}, {v}, {a})')


def pInit(q, poolstr):
  '''
  Initialize global shared variables among processes.
  Could possibly share queue and lock here
  '''
  global queue
  queue = q  # Point to the global queue in each process
  print(f'{poolstr} is initialized')


def succCB(result):
  print(f'Success returns = {result}')


def failCB(result):
  print(f'Failure returns = {result}')


if __name__ == '__main__':

  # Create shared memory to pass data to worker processes
  # lock=True for multiple worker processes on the same queue
  v1 = mp.Manager().Value('i', 0, lock=True)
  a1 = mp.Manager().Array('i', range(20), lock=True)
  # lock=False for 1 worker process on the queue
  v2 = mp.Manager().Value('i', 0, lock=False)
  a2 = mp.Manager().Array('i', range(20), lock=False)

  # Create queues for signaling worker processes
  queue1 = mp.Manager().Queue()
  queue2 = mp.Manager().Queue()

  # Creating pool of processes now - fork here
  pool1 = mp.Pool(2, initializer=pInit, initargs=(queue1, "pool1"))
  pool2 = mp.Pool(1, initializer=pInit, initargs=(queue2, "pool2"))

  # Assign entry function for each pool
  pool1.apply_async(f1, args=[(v1, a1)], callback=succCB, error_callback=failCB)
  pool1.apply_async(f1, args=[(v1, a1)], callback=succCB, error_callback=failCB)
  pool2.apply_async(f2, args=[(v2, a2)], callback=succCB, error_callback=failCB)

  # Parent process, worker processes do not see this anymore
  # Parent process notifies the worker processes
  for x in range(20):
    a1[x] = x
    a2[x] = x+10
  v1.value = 2
  v2.value = 18
  queue1.put(1) 
  queue1.put(1) 
  queue2.put(18)

  # Parant processes terminate or quit the worker processes
  queue1.put(-1) # Quit properly
  queue1.put(20) # Raise exception
  queue2.put(-1) # Quit properly

  pool1.close()
  pool2.close()
  pool1.join()
  pool2.join()

Выходные данные

pool1 is initialized
f1(1, Value('i', 2), array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]))
pool2 is initialized
f1(18, Value('i', 18), array('i', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]))
f2: Quit
pool1 is initialized
f1(1, Value('i', 2), array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]))
f1: Quit
Success returns = f1
Success returns = f2
Failure returns = f1: I do not like 20!
0 голосов
/ 25 сентября 2018

Для связи в поточном режиме вы можете использовать Queue.Метод get() блокирует, если очередь пуста, и ожидает, пока новый элемент не станет put():

from multiprocessing import Process, Queue

def f(q):
    while True:
        element = q.get()
        print(element)

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    q.put([42, None, 'hello'])
    p.join()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...