Хранение и извлечение объекта в ray.io - PullRequest
0 голосов
/ 01 апреля 2020

У меня есть кластер лучей, работающий на компьютере, как показано ниже:

ray start --head --redis-port=6379

У меня есть два файла, которые нужно запустить в кластере. Producer p_ray.py:

import ray
ray.init(address='auto', redis_password='5241590000000000')


@ray.remote
class Counter(object):
    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1

    def read(self):
        return self.n


counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(futures, type(futures[0]))
obj_id = ray.put(ray.get(futures))
print(obj_id)
print(ray.get(obj_id))
while True:
    pass

Consumer c_ray.py:

import ray
ray.init(address='auto', redis_password='5241590000000000')
[objs] = ray.objects()
print('OBJ-ID:', objs, 'TYPE:', type(objs))
print(ray.get([objs]))

Мое намерение состоит в том, чтобы сохранить объекты-фьючерсы от производителя и извлечь их у потребителя. Я могу получить идентификатор объекта в потребителе. Однако доход от потребителя никогда не вернется.

Что я делаю не так?

Как мне решить мое требование?

1 Ответ

0 голосов
/ 05 апреля 2020

Этот конкретный случай может быть ошибкой (я не уверен на 100%). Я создал проблему в Ray Github.

Но это не хороший способ получить объект, созданный p_ray.py. Если у вас много объектов, управлять им будет чрезвычайно сложно. Вы можете реализовать аналогичную вещь, используя detached actor. https://ray.readthedocs.io/en/latest/advanced.html#detached -actors .

Идея заключается в создании отдельного субъекта, который может быть получен любым водителем / работником, работающим в том же кластере.

p_ray.py

import ray
ray.init(address='auto', redis_password='5241590000000000')

@ray.remote
class DetachedQueue:
    def __init__(self):
        self.dict = {}
    def put(key, value):
        self.dict[key] = value
    def get(self):
        return self.dict


@ray.remote
class Counter(object):
    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1

    def read(self):
        return self.n


queue = DetachedQueue.remote(name="queue_1", detached=True)
counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(futures, type(futures[0]))
queue.put.remote("key", ray.get(futures)))
while True:
    pass

c_ray.py:

import ray
ray.init(address='auto', redis_password='5241590000000000')
queue = ray.util.get_actor("queue_1")
print(ray.get(queue.get.remote()))
...