Dispycos неблокирующие нити - PullRequest
       18

Dispycos неблокирующие нити

0 голосов
/ 26 ноября 2018

Я расширяю многопоточность. Поток для достижения паралелизма следующим образом:

class WorkerGenerico(threading. Thread):
    """ Clase con la funcionalidad principal de un worker genérico: Se inicia en un thread y se para de forma graceful
        mediante join (bloqueante). Adicionalmente permite enviar la orden de detención de forma no bloqueante mediante
        "gracefull_stop", aunque no garantiza que se detenga el thread.
        Args:
            input_q (:class:`queue.Queue`): Cola de entrada de datos al proceso. No deberia usarse, esta por escaoalabilidad.
            input_q (:class:`queue.Queue`): Cola por la que el programa envía sus resultados.

        Attributes:
            input_q (:class:`queue.Queue`): Cola de salida de entrada de datos (comandos?)
            input_q (:class:`queue.Queue`): Cola de salida de datos (resultados)
            stoprequest (:class:`threading.Event`): Condición de parada
        Raises:
    """
    def __init__(self, input_q, output_q):
        super(WorkerGenerico, self).__init__()
        self.input_q = input_q
        self.output_q = output_q
        self.stoprequest = threading.Event()

    # def run(self):
    #    pass

    def join(self, timeout=None):
        self.stoprequest.set()
        super(WorkerGenerico, self).join(timeout)

    def gracefull_stop(self):
        self.stoprequest.set()
        self.transcriptor.join()

    def has_orden_parada(self):
        return self.stoprequest.is_set()

    def _enviar_msg(self, texto):
        return self.output_q.put((self.name, texto))

    def _recibir_msg(self, timeout=0.5):
        return self.input_q.get(True, timeout)

    def _imprimir_error(self, texto):
        eprint(texto)

    def _tratar_excepcion(self, excepcion):
        self._imprimir_error("Excepcion no controlada: {0}".format(excepcion))
        eprint("Error inesperado:", sys.exc_info()[0])

Тогда .run становится неблокирующим циклом при выполнении task.start () извне._enviar_msg и _recibir_msg используют очереди для связи с основным процессом и отправки промежуточных результатов или получения новых параметров в середине цикла.Мое основное программное обеспечение не изменено, никому не нужно ждать, пока оно запустится.

Теперь мне нужно увеличить масштаб, а одного компьютера недостаточно, поэтому я решил использовать dispycos, однако, похоже, что они блокируются.задач и позволяют мне отправить только одну задачу и один раз, а затем она ожидает завершения всех задач.

Если я запускаю клиент dispycos, пример 8 , оказывается, что он никогда не выходит за рамкистрока

pycos.Task(client_proc, computation, 10 if len(sys.argv) < 2 else int(sys.argv[1]))

Чтобы дать вам представление, мои задачи выполняются в течение 23-72 часов.Их требования к ЦП являются динамическими, и я, вероятно, буду вручную контролировать количество входов в каждый узел.

Я бы хотел иметь возможность впервые распределить исходные коды на каждом компьютере, чтобы избежать определения классов длябез необходимости летать по сети.

Если возможно, я бы хотел прочитать все входящие сообщения из каждого типа задачи в одну и ту же очередь.Прямо сейчас я передаю один и тот же output_q каждому процессу (и их собственный input_q для получения направленных заказов), чтобы добиться этого и работает как чудо.

...