Python asyncio: очередь, параллельная нормальному коду - PullRequest
0 голосов
/ 01 апреля 2020

Редактировать: я закрываю этот вопрос. Как оказалось, моя цель иметь параллельные сообщения HTTP бессмысленно. После успешной реализации с помощью aiohttp я столкнулся с тупиками в других частях конвейера. Я переформулирую это и опубликую один вопрос через несколько дней.

Задача

Я хочу иметь класс, который во время некоторых других вычислений будет содержать сгенерированные данные и можете записать его в БД через HTTP (подробности ниже), когда это удобно. Это должен быть класс, так как он также используется для загрузки / представления / манипулирования данными.

Я написал наивную, не параллельную реализацию, которая работает: класс инициализируется и затем используется в "main l oop ». Данные добавляются в эту главную l oop в наивную «очередь» (список HTTP-запросов). Через определенные интервалы в основном l oop класс вызывает функцию для записи этих запросов и очистки «очереди».

Как вы можете ожидать, это связано с IO. Всякий раз, когда мне нужно написать «очередь», основной l oop останавливается. Кроме того, поскольку основные вычисления выполняются на графическом процессоре, l oop также не привязан к процессору.

По сути, я хочу иметь очередь, и, скажем, десять рабочих, работающих в фоновом режиме и помещающих элементы в коннектор http, ожидая, пока pu sh завершит работу sh, и затем получат следующий (или просто ожидание следующего вызова записи, не имеет большого значения). Тем временем мой основной l oop запускается и добавляет в очередь.

Пример программы

Моя наивная программа выглядит примерно так

class data_storage(...):
    def add(...):

    def write_queue(self):
        if len(self.queue) > 0:
            res = self.connector.run(self.queue)
            self.queue = []


def main_loop(storage):
    # do many things

    for batch in dataset: #simplified example
         # Do stuff
         for some_other_loop:
              (...)
              storage.add(results)

         # For example, call each iteration
         storage.write_queue()

if __name__ == "__main__":
    storage=data_storage()
    main_loop(storage)
...

Подробно: класс соединителя из пакета 'neo4j-connector' для публикации в моей базе данных Neo4j. По сути, он JSON форматирует и использует API "запросов" от python. Это работает, даже без реальной очереди, поскольку ничего не происходит одновременно.

Теперь я должен заставить его работать одновременно. Из моего исследования я понял, что в идеале я хотел бы иметь модель «производитель-потребитель», где оба инициализируются через asyncio. Я видел это только с помощью функций, а не классов, поэтому я не знаю, как к этому подойти. С функциями моя главная l oop должна быть сопрограммой производителя, а моя функция записи становится потребителем. Оба инициируются как задачи в очереди, а затем собираются, где я инициализирую только одного производителя, но много потребителей.

Моя проблема в том, что основной l oop включает в себя части, которые уже параллельны (например, PyTorch) , Asyncio не является потокобезопасным, поэтому я не думаю, что могу просто обернуть все в asyn c декораторах и создать совместную процедуру. Именно поэтому я хочу, чтобы DB logi c находилась в отдельном классе.

Я также на самом деле не хочу или не хочу, чтобы основной l oop запускался "одновременно" в одном потоке с работников. Но хорошо, если это так, потому что рабочие не слишком много работают с процессором. Но с технической точки зрения я хочу многопоточность? Понятия не имею.

Мой единственный вариант - записать в очередь, пока она не будет "заполнена", остановить l oop и затем использовать несколько потоков для выгрузки его в БД. Тем не менее, это будет гораздо медленнее, чем делать это, пока работает основной l oop. Мой выигрыш будет минимальным, просто параллелизм при работе через очередь. Я бы согласился на это, если понадобится.

Однако, из сообщения stackoverflow, я придумал это небольшое изменение

class data_storage(...):
    def add(...):

    def background(f):
        def wrapped(*args, **kwargs):
            return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
        return wrapped

    @background
    def write_queue(self):
        if len(self.queue) > 0:
            res = self.connector.run(self.queue)
            self.queue = []

Шокирующе этот вид "работает" и невероятно быстр , Конечно, поскольку это не настоящая очередь, все перезаписывается. Кроме того, это перегружает или блокирует HTTP API и, как правило, приводит к множеству ошибок.

Но поскольку это - в принципе - работает, мне интересно, смогу ли я сделать следующее:

class data_storage(...):
    def add(...):

    def background(f):
        def wrapped(*args, **kwargs):
            return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
        return wrapped

    @background
    def post(self, items):
        if len(items) > 0:
            self.nr_workers.increase()
            res = self.connector.run(items)
            self.nr_workers.decrease()

    def write_queue(self):         
         if self.nr_workers < 10:
               items=self.queue.get(200) # Extract and delete from queue, non-concurrent
               self.post(items) # Add "Worker"

для некоторых гипотетических объектов очереди и объектов nr_workers. Затем в конце основного l oop есть функция, которая блокирует прогресс до тех пор, пока число рабочих не станет равным нулю, и не будет одновременно очищать оставшуюся очередь.

Это кажется монументально плохой идеей , но я не знаю, как еще это реализовать. Если это ужасно, я хотел бы знать, прежде чем начать больше работать над этим. Как вы думаете, это будет работать?

В противном случае, не могли бы вы дать мне какие-нибудь советы о том, как правильно подходить к этой ситуации? Конечно, достаточно ключевых слов, инструментов или предметов для исследования.

Спасибо!

...