Как заставить актера делать две вещи одновременно? - PullRequest
0 голосов
/ 01 марта 2019

Я определяю learner и worker.Мне бы хотелось, чтобы learner выполнял свою функцию-член learn в фоновом режиме, и время от времени worker отправляет learner некоторую информацию для печати. ​​

Следующий код является примером

import ray

@ray.remote
class Learner():
    def __init__(self):
        pass

    def learn(self):
        while True:
            pass # do something, such as updating network 

    def log_score(self, score):
        print('worker', score)

@ray.remote
class Worker():
    def __init__(self, learner):
        self.learner = learner

    def sample(self):
        for i in range(1000000):
            if i % 1000 == 0:
                self.learner.log_score.remote(i)

ray.init()

learner = Learner.remote()
worker = Worker.remote(learner)


worker.sample.remote()
learner.learn.remote()

while True:
    pass

Однако learner не будет запускаться log_score до тех пор, пока не завершится learn, а это не то, что я хочу.Я подумал о том, как заставить это работать: вместо явного вызова Learner.learn у меня есть Worker.В частности, я переопределяю learn и sample следующим образом

"""Learner"""
def learn(self):
    # no loop here
    pass # do something, such as updating network 

"""Worker"""
def sample(self):
    for i in range(1000000):
        if i % 1000 == 0:
            self.learner.learn.remote()
            self.learner.log_score.remote(i)

Хотя это работает, но теперь я должен контролировать, как часто следует вызывать learn, что выглядит отчасти избыточно.Есть ли способ лучше достичь того, чего я хочу?

1 Ответ

0 голосов
/ 02 марта 2019

Это отличный вопрос.В актерской модели Рэя каждая актерская задача является атомарной в том смысле, что актер будет выполнять задачу одновременно и не начнет новую, пока не вернется предыдущая.Этот выбор упрощает рассуждения о параллелизме, но затрудняет, чтобы актер делал две вещи одновременно.

Чтобы сделать что-то подобное, у вас, по сути, есть два варианта.

  1. Потоки: Пусть актер выполняет некоторую работу в фоновом потоке и оставляет основной поток актера без дела, чтобы он мог выполнять новые задачи.

    import ray
    import threading
    import time
    
    @ray.remote
    class Actor(object):
        def __init__(self):
            self.value = 0
            self.t = threading.Thread(target=self.update, args=())
            self.t.start()
    
        def update(self):
            while True:
                time.sleep(0.01)
                self.value += 1
    
        def get_value(self):
            return self.value
    
    ray.init()
    
    # Create the actor. This will start a long-running thread in the background
    # that updates the value.
    a = Actor.remote()
    
    # Get the value a couple times.
    print(ray.get(a.get_value.remote()))
    print(ray.get(a.get_value.remote()))
    
  2. Меньшие единицы работы: Это означает реструктуризацию кода так, чтобы никакой метод субъекта не зацикливался вечно.В вашем примере вы можете вернуть функцию learn после некоторого количества проходов через цикл.В этом случае новые learn задачи должны постоянно отправляться.Можно даже сделать так, чтобы метод learn отправлял возврат и отправлял себя, чтобы можно было планировать другие методы между ними.Есть много способов сделать это, которые будут зависеть от вашего приложения, но этот пример приведен ниже.

    import ray
    import threading
    import time
    
    @ray.remote
    class Actor(object):
        def __init__(self):
            self.value = 0
    
        def set_handle_to_self(self, handle_to_self):
            self.handle_to_self = handle_to_self
    
        def learn(self):
            for _ in range(10):
                time.sleep(0.01)
                self.value += 1
    
            # Submit the learn task again so that the learning continues
            # but other methods can be scheduled in between.
            self.handle_to_self.learn.remote()
    
        def get_value(self):
            return self.value
    
    ray.init()
    
    # Create the actor. This will start a long-running thread in the background
    # that updates the value.
    a = Actor.remote()
    # Give the actor a handle to itself so that it can submit tasks to itself.
    a.set_handle_to_self.remote(a)
    
    # Start the learning, which will continue forever.
    a.learn.remote()
    
    # Get the value a couple times.
    print(ray.get(a.get_value.remote()))
    print(ray.get(a.get_value.remote()))
    
...