сочетание asyncio с многопроцессорностью - PullRequest
0 голосов
/ 01 августа 2020

Я пробую несколько подходов для своего хобби-проекта, который представляет собой приложение для обработки потоков данных. Он получает данные из нескольких потоков и обрабатывает их параллельно. Есть также очень тяжелые задачи с CPU. Процессы должны взаимодействовать через очереди. Многие процессы сохраняют данные в хранилище ключей / значений LMDB. Компоненты:

  • считыватели: чтение из сокетов или из баз данных фрагментами
  • обработчики данных: тяжелые вычисления, получение данных от считывателей, отправка результата через очереди
  • экспортеры: получение данных от процессоров данных и сохранение их, например, в файлы CSV. Контроллер приказывает, когда это делать.
  • агрегаторы данных: также получают данные от процессоров данных, преобразуют их и сохраняют в LMDB.
  • запрос: процесс, который взаимодействует с одним пользователем (в основном это другое приложение), которое считывает данные из нескольких агрегаторов данных
  • контроллер: отслеживает все и управляет процессами, при необходимости запускает новые процессы, приостанавливает / перезапускает их и т. д. c.

Итак, все компоненты делают разные вещи и общаются друг с другом через очереди (или сокеты).

Создание нескольких процессов с многопроцессорностью - это нормально, уже протестировано. Также работал с asyncio: все было полностью разделено и выполнялось в нескольких командных окнах, и обменивались данными через сокеты, запуская сервер asynio.

Теперь пришло время как-то их объединить, потому что multipro c становится медленнее (не каждый процессы требуют больших вычислений, а также имеют место значительные простои ввода-вывода). Также хочу перейти из сокетов TCP в очереди, чтобы уменьшить использование портов, которые в противном случае должны быть зашифрованы и, надеюсь, также получить некоторую скорость. Но я понятия не имею, как это сделать. Возможно ли или разумно запускать новые циклы событий в каждом процессе? Если да, то какие очереди использовать? Если возможно, я бы хотел избежать использования сторонних библиотек, которые некоторое время не поддерживаются. Все еще использую 3.7.7 Python. Пожалуйста, помогите мне с некоторыми примерами кодов или ресурсов на Net, где я могу начать.

Изменить: более конкретный c с кодом ниже.

import asyncio
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue
from queue import Empty
from time import sleep

class IOProc:
    def __init__(self, name, qin, qout):
        self.name = name
        self.qin = qin
        self.qout = qout

    async def proc(self):
        await asyncio.sleep(1.)
        print("in async IO proc")

        while True:
            try:
                msg = self.qin.get(block = False)
                if "end" in msg:
                    task = {"sender" : name, "end" : "end"}
                    #self.qout.put_nowait(task)
                    break
                else:
                    task =  {"sender" : name, "data" : msg["data"]}
                    primt(f"msg to put into out queue")
                    #self.qout.put_nowait(task)
            except Empty:
                print("Queue is empty. ", Empty)
                await asyncio.sleep(0.1)
                # maybe we should break the loop as the queue is empty
                pass
            except KeyboardInterrupt:
                break
        self.qin.task_done()
        return

    def run(self):
        print(f"{self.name} process started")
        asyncio.run(self.proc())
        

class Controller:
    def __init__(self, name, numbers):
        self.name = name
        self.q1 = Queue()
        self.q2 = Queue()
        self.numbers = numbers

    def run(self):
        print("controller started")
        for num in self.numbers:
            self.q1.put({"sender" : self.name, "number" : num}) 
        self.q1.put({"sender" : self.name, "end" : "end"})
    
    def closeQueues(self):
        
        self.q1.close()
        self.q2.close()


def main():
    NUMBERS = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773]

    controller = Controller(name = "controller", numbers = NUMBERS)
    futures = []
    with ProcessPoolExecutor(3) as executor:
        print("initialization")
        worker = IOProc(name = "reader-01", qin = controller.q1, qout = controller.q2)
        futures.append(executor.submit(worker.run()))
        print("executing the controller")
        worker = controller
        futures.append(executor.submit(worker.run()))
        
    print("done?")
    for future in futures:
        print(future.done())

    sleep(3)
    controller.closeQueues()
    
if __name__ == '__main__':
    main()

1-й рабочий начинает работать, но второй никогда не выполняется ProcessPoolExecutor. В моем понимании это должен быть отдельный процесс, параллельно начатый, но, похоже, нет. Что пошло не так?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...