Я пробую несколько подходов для своего хобби-проекта, который представляет собой приложение для обработки потоков данных. Он получает данные из нескольких потоков и обрабатывает их параллельно. Есть также очень тяжелые задачи с CPU. Процессы должны взаимодействовать через очереди. Многие процессы сохраняют данные в хранилище ключей / значений LMDB. Компоненты:
- считыватели: чтение из сокетов или из баз данных фрагментами
- обработчики данных: тяжелые вычисления, получение данных от считывателей, отправка результата через очереди
- экспортеры: получение данных от процессоров данных и сохранение их, например, в файлы CSV. Контроллер приказывает, когда это делать.
- агрегаторы данных: также получают данные от процессоров данных, преобразуют их и сохраняют в LMDB.
- запрос: процесс, который взаимодействует с одним пользователем (в основном это другое приложение), которое считывает данные из нескольких агрегаторов данных
- контроллер: отслеживает все и управляет процессами, при необходимости запускает новые процессы, приостанавливает / перезапускает их и т. д. c.
Итак, все компоненты делают разные вещи и общаются друг с другом через очереди (или сокеты).
Создание нескольких процессов с многопроцессорностью - это нормально, уже протестировано. Также работал с asyncio: все было полностью разделено и выполнялось в нескольких командных окнах, и обменивались данными через сокеты, запуская сервер asynio.
Теперь пришло время как-то их объединить, потому что multipro c становится медленнее (не каждый процессы требуют больших вычислений, а также имеют место значительные простои ввода-вывода). Также хочу перейти из сокетов TCP в очереди, чтобы уменьшить использование портов, которые в противном случае должны быть зашифрованы и, надеюсь, также получить некоторую скорость. Но я понятия не имею, как это сделать. Возможно ли или разумно запускать новые циклы событий в каждом процессе? Если да, то какие очереди использовать? Если возможно, я бы хотел избежать использования сторонних библиотек, которые некоторое время не поддерживаются. Все еще использую 3.7.7 Python. Пожалуйста, помогите мне с некоторыми примерами кодов или ресурсов на Net, где я могу начать.
Изменить: более конкретный c с кодом ниже.
import asyncio
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue
from queue import Empty
from time import sleep
class IOProc:
def __init__(self, name, qin, qout):
self.name = name
self.qin = qin
self.qout = qout
async def proc(self):
await asyncio.sleep(1.)
print("in async IO proc")
while True:
try:
msg = self.qin.get(block = False)
if "end" in msg:
task = {"sender" : name, "end" : "end"}
#self.qout.put_nowait(task)
break
else:
task = {"sender" : name, "data" : msg["data"]}
primt(f"msg to put into out queue")
#self.qout.put_nowait(task)
except Empty:
print("Queue is empty. ", Empty)
await asyncio.sleep(0.1)
# maybe we should break the loop as the queue is empty
pass
except KeyboardInterrupt:
break
self.qin.task_done()
return
def run(self):
print(f"{self.name} process started")
asyncio.run(self.proc())
class Controller:
def __init__(self, name, numbers):
self.name = name
self.q1 = Queue()
self.q2 = Queue()
self.numbers = numbers
def run(self):
print("controller started")
for num in self.numbers:
self.q1.put({"sender" : self.name, "number" : num})
self.q1.put({"sender" : self.name, "end" : "end"})
def closeQueues(self):
self.q1.close()
self.q2.close()
def main():
NUMBERS = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773]
controller = Controller(name = "controller", numbers = NUMBERS)
futures = []
with ProcessPoolExecutor(3) as executor:
print("initialization")
worker = IOProc(name = "reader-01", qin = controller.q1, qout = controller.q2)
futures.append(executor.submit(worker.run()))
print("executing the controller")
worker = controller
futures.append(executor.submit(worker.run()))
print("done?")
for future in futures:
print(future.done())
sleep(3)
controller.closeQueues()
if __name__ == '__main__':
main()
1-й рабочий начинает работать, но второй никогда не выполняется ProcessPoolExecutor. В моем понимании это должен быть отдельный процесс, параллельно начатый, но, похоже, нет. Что пошло не так?