Редактировать: я закрываю этот вопрос. Как оказалось, моя цель иметь параллельные сообщения HTTP бессмысленно. После успешной реализации с помощью aiohttp я столкнулся с тупиками в других частях конвейера. Я переформулирую это и опубликую один вопрос через несколько дней.
Задача
Я хочу иметь класс, который во время некоторых других вычислений будет содержать сгенерированные данные и можете записать его в БД через HTTP (подробности ниже), когда это удобно. Это должен быть класс, так как он также используется для загрузки / представления / манипулирования данными.
Я написал наивную, не параллельную реализацию, которая работает: класс инициализируется и затем используется в "main l oop ». Данные добавляются в эту главную l oop в наивную «очередь» (список HTTP-запросов). Через определенные интервалы в основном l oop класс вызывает функцию для записи этих запросов и очистки «очереди».
Как вы можете ожидать, это связано с IO. Всякий раз, когда мне нужно написать «очередь», основной l oop останавливается. Кроме того, поскольку основные вычисления выполняются на графическом процессоре, l oop также не привязан к процессору.
По сути, я хочу иметь очередь, и, скажем, десять рабочих, работающих в фоновом режиме и помещающих элементы в коннектор http, ожидая, пока pu sh завершит работу sh, и затем получат следующий (или просто ожидание следующего вызова записи, не имеет большого значения). Тем временем мой основной l oop запускается и добавляет в очередь.
Пример программы
Моя наивная программа выглядит примерно так
class data_storage(...):
def add(...):
def write_queue(self):
if len(self.queue) > 0:
res = self.connector.run(self.queue)
self.queue = []
def main_loop(storage):
# do many things
for batch in dataset: #simplified example
# Do stuff
for some_other_loop:
(...)
storage.add(results)
# For example, call each iteration
storage.write_queue()
if __name__ == "__main__":
storage=data_storage()
main_loop(storage)
...
Подробно: класс соединителя из пакета 'neo4j-connector' для публикации в моей базе данных Neo4j. По сути, он JSON форматирует и использует API "запросов" от python. Это работает, даже без реальной очереди, поскольку ничего не происходит одновременно.
Теперь я должен заставить его работать одновременно. Из моего исследования я понял, что в идеале я хотел бы иметь модель «производитель-потребитель», где оба инициализируются через asyncio. Я видел это только с помощью функций, а не классов, поэтому я не знаю, как к этому подойти. С функциями моя главная l oop должна быть сопрограммой производителя, а моя функция записи становится потребителем. Оба инициируются как задачи в очереди, а затем собираются, где я инициализирую только одного производителя, но много потребителей.
Моя проблема в том, что основной l oop включает в себя части, которые уже параллельны (например, PyTorch) , Asyncio не является потокобезопасным, поэтому я не думаю, что могу просто обернуть все в asyn c декораторах и создать совместную процедуру. Именно поэтому я хочу, чтобы DB logi c находилась в отдельном классе.
Я также на самом деле не хочу или не хочу, чтобы основной l oop запускался "одновременно" в одном потоке с работников. Но хорошо, если это так, потому что рабочие не слишком много работают с процессором. Но с технической точки зрения я хочу многопоточность? Понятия не имею.
Мой единственный вариант - записать в очередь, пока она не будет "заполнена", остановить l oop и затем использовать несколько потоков для выгрузки его в БД. Тем не менее, это будет гораздо медленнее, чем делать это, пока работает основной l oop. Мой выигрыш будет минимальным, просто параллелизм при работе через очередь. Я бы согласился на это, если понадобится.
Однако, из сообщения stackoverflow, я придумал это небольшое изменение
class data_storage(...):
def add(...):
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def write_queue(self):
if len(self.queue) > 0:
res = self.connector.run(self.queue)
self.queue = []
Шокирующе этот вид "работает" и невероятно быстр , Конечно, поскольку это не настоящая очередь, все перезаписывается. Кроме того, это перегружает или блокирует HTTP API и, как правило, приводит к множеству ошибок.
Но поскольку это - в принципе - работает, мне интересно, смогу ли я сделать следующее:
class data_storage(...):
def add(...):
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def post(self, items):
if len(items) > 0:
self.nr_workers.increase()
res = self.connector.run(items)
self.nr_workers.decrease()
def write_queue(self):
if self.nr_workers < 10:
items=self.queue.get(200) # Extract and delete from queue, non-concurrent
self.post(items) # Add "Worker"
для некоторых гипотетических объектов очереди и объектов nr_workers. Затем в конце основного l oop есть функция, которая блокирует прогресс до тех пор, пока число рабочих не станет равным нулю, и не будет одновременно очищать оставшуюся очередь.
Это кажется монументально плохой идеей , но я не знаю, как еще это реализовать. Если это ужасно, я хотел бы знать, прежде чем начать больше работать над этим. Как вы думаете, это будет работать?
В противном случае, не могли бы вы дать мне какие-нибудь советы о том, как правильно подходить к этой ситуации? Конечно, достаточно ключевых слов, инструментов или предметов для исследования.
Спасибо!