Эффективность многопоточной коммуникации Python - PullRequest
0 голосов
/ 06 декабря 2018

Я новичок в многозадачности Python.Я делаю это старомодным способом:

Я наследую от многопоточности. Использую очереди и использую очереди. Очереди для отправки сообщений в / из основного потока.

Это мойбазовый поточный класс:

class WorkerGenerico(threading.Thread):
    def __init__(self, task_id, input_q=None, output_q=None, keep_alive=300):
        super(WorkerGenerico, self).__init__()
        self._task_id = task_id
        if input_q is None:
            self._input_q = queue.Queue()
        else:
            if isinstance(input_q, queue.Queue):
                self._input_q = input_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        if output_q is None:
            self._output_q = queue.Queue()
        else:
            if isinstance(output_q, queue.Queue):
                self._output_q = output_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        if not isinstance(keep_alive, int):
            raise TypeError("El valor de keep_alive debe der un int.")
        self._keep_alive = keep_alive
        self.stoprequest = threading.Event()

    # def run(self):
    #    Implement a loop in subclases which checks if self.has_orden_parada() is true in order to stop.

    def join(self, timeout=None):
        self.stoprequest.set()
        super(WorkerGenerico, self).join(timeout)

    def gracefull_stop(self):
        self.stoprequest.set()

    def has_orden_parada(self):
        return self.stoprequest.is_set()

    def put(self,texto, block=True, timeout=None):
        return self._input_q.put(texto, block=block, timeout=timeout)

    def get(self, block=True, timeout=None):
        return self._output_q.get(block=block, timeout=timeout)

Мой вопрос заключается в том, насколько дорого вызывать WorkerGenerico.get () извне, поскольку он предпочитает хранить очередь в основном потоке и использовать Queue.get (). Оба метода выглядят схожими по производительности с небольшими нечастыми управляющими сообщениями , однако, я предполагаю, что очень частые вызовы сделают метод B оправданным для использования.-потребляющий (он должен каким-то образом вызывать метод из внешнего потока и передавать определение очереди обратно, я полагаю, что потеря зависит от реализации Python), однако конечный код более читабелен и интуитивно понятен.

Если бы мне пришлось судить по опыту работы с другими языками, я бы сказал, что метод B намного лучше, а я прав?

Метод A:

def main()
    worker = WorkerGenerico(task_id=1)
    worker.start()
    print(worker.get())

Метод B:

def main()
    input_q = Queue()
    output_q = Queue()
    worker = WorkerGenerico(task_id=1, input_q=input_q, output_q=output_q)
    worker.start()
    print(output_q.get())

Кстати: для полноты картины я хотел бы поделиться тем, как я это делаю сейчас.Это смесь обоих методов, которая предлагает хороший конверт для потоков:

class EnvoltorioWorker:
    def __init__(self, task_id, input_q=None, output_q=None, keep_alive=300):
        if input_q is None:
            self._input_q = queue.Queue()
        else:
            if isinstance(input_q, queue.Queue):
                self._input_q = input_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        if output_q is None:
            self._output_q = queue.Queue()
        else:
            if isinstance(output_q, queue.Queue):
                self._output_q = output_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        self.worker = WorkerGenerico(task_id, input_q, output_q, keep_alive)

    def put(self, elem, block=True, timeout=None):
        return self._input_q.put(elem, block=block, timeout=timeout)

    def get(self, block=True, timeout=None):
        return self._output_q.get(block=block, timeout=timeout)

Я использую EnvoltorioWorker.worker. * Для вызова объединений или других внешних методов управления и EnvoltorioWorker.get / EnvoltorioWorker.put для связи свнутренний класс правильно, вот так:

def main()
    worker_container = EnvoltorioWorker(task_id=1)
    worker_container.worker.start()
    print(worker_container.get())

Обычно я также делаю интерфейсы для start (), join () и nonwait_stop () в EnvoltorioWorker, если другой доступ к работнику не нужен.

Это может выглядеть глупо, и, возможно, есть лучшие способы достичь этого, поэтому:

Какой метод (A или B) является лучшей практикой?Является ли наследование от Thread правильным способом обработки потоков в Python? Я использую dispycos для распределенных сред и аналогичных конвертов для связи со своими потоками

РЕДАКТИРОВАТЬ: Только что заметил, что забыл перевести комментарии и некоторыеСтроки в классах, но они достаточно просты, поэтому я думаю, что они читабельны.Я отредактирую его, когда у меня будет время.

Есть мысли?

1 Ответ

0 голосов
/ 07 декабря 2018

Ваша очередь на самом деле не хранится в потоке.Предполагая, что здесь CPython, все объекты хранятся в куче, а потоки имеют только собственный стек.Объекты в куче совместно используются всеми потоками в одном и том же процессе.

Управление памятью в Python включает частную кучу, содержащую все объекты Python и структуры данных.Управление этой частной кучей обеспечивается внутренним менеджером памяти Python.Менеджер памяти Python имеет различные компоненты, которые имеют дело с различными аспектами динамического управления хранением, такими как совместное использование, сегментация, предварительное распределение или кэширование. документы

Из этого следует, что это не вопрос , где находится ваш объект (ваша очередь), потому что он всегда в куче.Переменные (имена) в Python являются просто ссылками на эти объекты.

Вещи, которые влияют на вашу среду выполнения, включают то, сколько фреймов вызовов вы добавляете в свои стеки, вкладывая вызовы функций / методов, и сколько ~ инструкций байт-кода вам нужно.Так как это влияет на время?


Тест

Рассмотрим следующую фиктивную настройку для очереди и работника.Для простоты здесь фиктивный рабочий не используется, поскольку многопоточность не влияет на временные параметры в сценарии, где мы притворяемся, что просто истощаем предварительно заполненную очередь.

class Queue:
    def get(self):
        return 1

class Worker:
    def __init__(self, queue):
        self.queue = queue
        self.quick_get = self.queue.get # a reference to a method as instance attribute

    def get(self):
        return self.queue.get()

    def quick_get_method(self):
        return self.quick_get()

Как вы можете видеть, Workerимеет две версии get-методов, get таким образом, как вы определяете его и quick_get_method, что на одну байт-кодовую инструкцию короче, как мы увидим позже.Рабочий экземпляр содержит не только ссылку на экземпляр queue, но также непосредственно на queue.get через self.quick_get, где мы оставляем одну инструкцию.

Теперь время для сравнения всех возможностей с .get() из фальшивой очереди в сеансе IPython:

q = Queue()
w = Worker(q)

%timeit q.get()
285 ns ± 1.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
%timeit w.get()
609 ns ± 2.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
%timeit w.quick_get()
286 ns ± 0.756 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
%timeit w.quick_get_method()
555 ns ± 0.855 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

Обратите внимание, что нет разницы во времени между q.get() и w.quick_get().Также обратите внимание на улучшенную синхронизацию w.quick_get_method() по сравнению с традиционной w.get().Использование Worker-method для вызова get() в очереди все еще почти удваивает время по сравнению с q.get() и w.quick_get().Почему это так?

Можно получить читаемую человеком версию инструкций байт-кода Python, над которой работает переводчик, с помощью модуля dis.

import dis

dis.dis(q.get)
  3           0 LOAD_CONST               1 (1)
              2 RETURN_VALUE

dis.dis(w.get)
  8           0 LOAD_FAST                0 (self)
              2 LOAD_ATTR                0 (queue)
              4 LOAD_METHOD              1 (get)
              6 CALL_METHOD              0
              8 RETURN_VALUE

dis.dis(w.quick_get)
  3           0 LOAD_CONST               1 (1)
              2 RETURN_VALUE

dis.dis(w.quick_get_method)
 11           0 LOAD_FAST                0 (self)
              2 LOAD_METHOD              0 (quick_get)
              4 CALL_METHOD              0
              6 RETURN_VALUE

Имейте в виду, что нашипустышка Queue.get здесь просто возвращается 1. Вы видите, что q.get - это то же самое, что и w.quick_get, что также отражено в сроках, которые мы видели раньше.Обратите внимание, что w.quick_get_method напрямую загружает quick_get, что является просто еще одним именем / переменной для объекта, на который ссылается queue.get.

Вы также можете получить глубину стека, напечатанную с помощью dis module:

def print_stack_depth(f):
    print(*[s for s in dis.code_info(f).split('\n') if
            s.startswith('Stack size:')]
    )

print_stack_depth(q.get)
Stack size:        1 
print_stack_depth(w.get)
Stack size:        2
print_stack_depth(w.quick_get)
Stack size:        1
print_stack_depth(w.quick_get_method)
Stack size:        2

Различия в байт-коде и синхронизации между различными подходами подразумевают (менее удивительно), что добавление еще одного кадра (добавление другого метода) обеспечивает наибольшее снижение производительности.


Обзор

Приведенный выше анализ не является неявной просьбой не использовать дополнительные методы Worker для вызова методов на объектах, на которые имеются ссылки (queue.get).Для удобочитаемости, ведения журнала и упрощения отладки это как раз то, что нужно сделать.Оптимизации, такие как Worker.quick_get_method, вы также найдете, например, в Stdlib's multiprocessing.pool.Pool, который также использует очереди для внутреннего использования.

Чтобы определить время из эталонного теста в перспективе, несколько сотен наносекунд - это немного (дляPython).В Python 3 максимальный интервал времени по умолчанию, который поток может содержать GIL и, следовательно, выполнение байт-кода с натяжкой, составляет 5 миллисекунд.Это 5 *1000* 1000 наносекунд.

Несколько сотен наносекунд также мало по сравнению с многопоточностью, которая накладывается в любом случае.Например, я обнаружил, что добавление 20 мкс сна после queue.put(integer) в одном потоке и простое чтение из очереди в другом потоке приводило к дополнительным издержкам, равным примерно 64,0 мкс в среднем на одну итерацию (время ожидания 20 мкс , не включенное в диапазоне 100 КБ (Python 3.7.1, Ubuntu 18.04).


Дизайн

Что касается вашего вопроса о предпочтениях дизайна, я бы определенно выбрал метод А здесь, а не метод Б. Еще больше, если ваши очереди в любом случае не используются в нескольких потоках.IMO ваше смешанное создание в последнем фрагменте излишне усложняет вещи / понимание в случае, когда вы просто используете один WorkerGenerico экземпляр для внутреннего использования (не пул рабочих потоков).Вопреки методу А, «неряшливость» вашего работника здесь также погребена глубоко внутри другого класса.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...