Проблема с попыткой использовать многопроцессорную очередь Python с RAY - PullRequest
0 голосов
/ 19 мая 2019

Я хотел бы использовать актера RAY для порождения рабочих, и каждый работник будет использовать многопроцессорный пакет Python для порождения процесса для генерации случайных целочисленных данных.Каждый работник будет хранить данные в своей локальной очереди или общей глобальной очереди.Очереди реализованы с помощью многопроцессорного пакета Python.

Я подтвердил, что рабочие появляются и правильно генерируют данные.Проблема здесь состоит в том, чтобы заставить их хранить данные в очередях.

Я знаю, что могу просто использовать пакет многопроцессорной обработки Python для достижения желаемого, но Мне бы очень хотелось знать, как правильно использовать очереди многопроцессорной обработки Python с актером Рэя. Если невозможно заставить их работать вместе, есть ли альтернатива?

Я предоставил 3 куска кода, A, B и C. Коды были запущены в Google Colab.A запускается без проблем, порождает рабочих и генерирует данные правильно, очереди не использовались.B включает только локальные рабочие очереди, а C включает только глобальную очередь без каких-либо локальных рабочих очередей.Оба B & C выдают ошибки.

A работает без проблем, порождает рабочих и генерирует данные правильно, очереди не использовались:

import numpy as np
import ray, time
from multiprocessing import Process

@ray.remote
class Worker(object):
  def __init__(self, w_id):
    self.w_id = w_id
    self.process = Process(target=self._generate_data, args=())
    self.process.start() 

  def _generate_data(self):
    while True:
      data = np.random.randint(1,4)
      time.sleep(1)
      print(self.w_id, data)
    return self.w_id, data

if __name__ == '__main__':  
  ray.init(ignore_reinit_error=True)

  Ws = [Worker.remote(w_id) for w_id in range(2)]
  i = 0
  while True:
    time.sleep(1)
    print(i)
    if i>9:
      break
    i+=1

  ray.shutdown()

Bвключает только локальные рабочие очереди (выдает ошибку):

import numpy as np
import ray, time    
from multiprocessing import Process, Queue

@ray.remote
class Worker(object):
  def __init__(self, w_id):
    self.w_id = w_id
    self.queue = Queue()
    self.process = Process(target=self._generate_data, args=())
    self.process.start() 

  def _generate_data(self):
    while True:
      data = np.random.randint(1,4)
      self.queue.put(data)
      time.sleep(1)
    return self.w_id   

if __name__ == '__main__':  
  ray.init(ignore_reinit_error=True)

  Ws = [Worker.remote(w_id) for w_id in range(1)]
  i = 0
  while True:
    time.sleep(2)
    if i>9:
      break
    i+=1

  ray.shutdown()

C включает только глобальную очередь без каких-либо локальных рабочих очередей (выдает ошибку):

import numpy as np
import ray, time
from multiprocessing import Process, Queue

@ray.remote
class Worker(object):
  def __init__(self, w_id, g_q):
    self.w_id = w_id
    self.queue = g_q
    self.process = Process(target=self._generate_data, args=(g_q))
    self.process.start() 

  def _generate_data(self, g_q):
    while True:
      data = np.random.randint(1,4)
      g_q.put(data)
      time.sleep(1)
      print(self.w_id, data)
    return self.w_id, data

if __name__ == '__main__':  
  ray.init(ignore_reinit_error=True)
  g_q = Queue()
  Ws = [Worker.remote(w_id, g_q) for w_id in range(2)]
  i = 0
  while True:
    time.sleep(1)
    print(i)
    if i>9:
      break
    i+=1

  ray.shutdown()

Вывод A правильный:

2019-05-19 02:26:42,328 WARNING worker.py:1341 -- WARNING: Not updating worker name since `setproctitle` is not installed. Install this with `pip install setproctitle` (or ray[debug]) to enable monitoring of worker processes.
2019-05-19 02:26:42,330 INFO node.py:497 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-05-19_02-26-42_330364_128/logs.
2019-05-19 02:26:42,442 INFO services.py:409 -- Waiting for redis server at 127.0.0.1:13715 to respond...
2019-05-19 02:26:42,580 INFO services.py:409 -- Waiting for redis server at 127.0.0.1:57570 to respond...
2019-05-19 02:26:42,584 INFO services.py:806 -- Starting Redis shard with 2.58 GB max memory.
2019-05-19 02:26:42,632 INFO node.py:511 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-05-19_02-26-42_330364_128/logs.
2019-05-19 02:26:42,637 INFO services.py:1441 -- Starting the Plasma object store with 3.87 GB memory using /dev/shm.
2019-05-19 02:26:42,761 WARNING actor.py:614 -- Actor is garbage collected in the wrong driver. Actor id = ActorID(75150eec35127d050bf435039332f2bcc43e6a11), class name = Worker.
2019-05-19 02:26:42,765 WARNING actor.py:614 -- Actor is garbage collected in the wrong driver. Actor id = ActorID(6791cde57023d522b51b44b3a3564c7b152beb62), class name = Worker.
0
(pid=801) 0 2
1
(pid=800) 1 2
(pid=801) 0 1
2
(pid=800) 1 3
(pid=801) 0 1
3
(pid=800) 1 2
(pid=801) 0 3
4
(pid=800) 1 1
(pid=801) 0 3
5
(pid=800) 1 1
(pid=801) 0 3
6
(pid=800) 1 1
(pid=801) 0 2
7
(pid=800) 1 3
(pid=801) 0 2
8
(pid=800) 1 2
(pid=801) 0 2
9
(pid=800) 1 3
(pid=801) 0 3
10

Сообщение об ошибке B:

2019-05-19 02:21:17,932 WARNING worker.py:1341 -- WARNING: Not updating worker name since `setproctitle` is not installed. Install this with `pip install setproctitle` (or ray[debug]) to enable monitoring of worker processes.
2019-05-19 02:21:17,935 INFO node.py:497 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-05-19_02-21-17_934897_128/logs.
2019-05-19 02:21:18,047 INFO services.py:409 -- Waiting for redis server at 127.0.0.1:42660 to respond...
2019-05-19 02:21:18,184 INFO services.py:409 -- Waiting for redis server at 127.0.0.1:55986 to respond...
2019-05-19 02:21:18,188 INFO services.py:806 -- Starting Redis shard with 2.58 GB max memory.
2019-05-19 02:21:18,231 INFO node.py:511 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-05-19_02-21-17_934897_128/logs.
2019-05-19 02:21:18,234 INFO services.py:1441 -- Starting the Plasma object store with 3.87 GB memory using /dev/shm.
2019-05-19 02:21:18,341 WARNING actor.py:614 -- Actor is garbage collected in the wrong driver. Actor id = ActorID(4171cdedcc979400748671ace92a1bcefb10f213), class name = Worker.
2019-05-19 02:21:23,976 ERROR worker.py:1616 -- Possible unhandled error from worker: ray_worker (pid=320, host=0366b816fc55)
  File "<ipython-input-4-d91015ddd20f>", line 9, in __init__
  File "/usr/lib/python3.6/multiprocessing/context.py", line 101, in Queue
    from .queues import Queue
KeyError: "'__name__' not in globals"

Сообщение об ошибке C:

2019-05-19 02:43:09,760 WARNING worker.py:1341 -- WARNING: Not updating worker name since `setproctitle` is not installed. Install this with `pip install setproctitle` (or ray[debug]) to enable monitoring of worker processes.
2019-05-19 02:43:09,763 INFO node.py:497 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-05-19_02-43-09_763458_128/logs.
2019-05-19 02:43:09,888 INFO services.py:409 -- Waiting for redis server at 127.0.0.1:36483 to respond...
2019-05-19 02:43:10,060 INFO services.py:409 -- Waiting for redis server at 127.0.0.1:46985 to respond...
2019-05-19 02:43:10,070 INFO services.py:806 -- Starting Redis shard with 2.58 GB max memory.
2019-05-19 02:43:10,126 INFO node.py:511 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-05-19_02-43-09_763458_128/logs.
2019-05-19 02:43:10,128 INFO services.py:1441 -- Starting the Plasma object store with 3.87 GB memory using /dev/shm.
2019-05-19 02:43:10,341 WARNING worker.py:342 -- WARNING: Falling back to serializing objects of type <class '_multiprocessing.SemLock'> by using pickle. This may be inefficient.
2019-05-19 02:43:10,348 WARNING worker.py:402 -- WARNING: Serializing the class <class 'multiprocessing.queues.Queue'> failed, so are are falling back to cloudpickle.
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/ray/worker.py in put_object(self, object_id, value)
    382         try:
--> 383             self.store_and_register(object_id, value)
    384         except pyarrow.PlasmaObjectExists:

26 frames
TypeError: can't pickle _multiprocessing.SemLock objects

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
/usr/lib/python3.6/multiprocessing/context.py in assert_spawning(obj)
    354         raise RuntimeError(
    355             '%s objects should only be shared between processes'
--> 356             ' through inheritance' % type(obj).__name__
    357             )

RuntimeError: Queue objects should only be shared between processes through inheritance
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...