Python многопроцессорность apply_async - PullRequest
0 голосов
/ 09 мая 2020

Я работаю над дизайном приложения, в котором я собираю входящие данные из сокета (работающего в отдельном потоке) и сохраняю их в очереди многопроцессорной обработки. Затем в основном потоке я читаю объекты из очереди и apply_function для каждого из них, используя multiprocessing apply_asyn c.

import time
import os
import stat
import json
import struct
import socketserver
from datetime import date

import threading
from multiprocessing import cpu_count, Pool, Queue 

from decouple import config

# Global Variable declarations
q = Queue() # Multiprocessed Queue

class RecvFileDataStreamHandler(socketserver.StreamRequestHandler):
    '''Handler for a recieving request and adding data into queues'''

    def handle(self):
        '''
        Handle multiple requests - each 4-byte length,
        followed by the converting data to dict and add in queues
        '''

        # Code to Handle incoming requests
        # Unpickle received data and store it in multiproessing
        q.put(data_dict)

class FileDataReceiver(socketserver.ThreadingTCPServer):
    daemon_threads = True

    def __init__(self, host=HOST, port=WATCHER_PORT, handler=RecvFileDataStreamHandler):
        print(f"Started listening for file_data on {(host, port)}")
        socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)


def process_file(_file):
    # Main worker process


def callback_func(result):
    # Do something with data

def error_func(result):
    print("ERROR CALLBACK")


if __name__=="__main__":
    recv = FileDataReceiver()
    threading.Thread(target=recv.serve_forever, daemon=True).start()


    # Infinite Main Thread to process files in file_q
    while True:
        if q.qsize():
            result = Pool().apply_async(process_file, args=(file_q.get(False),), callback=callback_func, error_callback=error_func)

Проблема здесь в том, что данные будут непрерывными, поэтому мне нужно отслеживать бесконечно, поэтому я не может закрыть Pool. Без закрытия пула он порождает несколько дочерних процессов, которые превышают допустимый предел на машине Linux. Есть ли какое-то обходное решение или лучший дизайн приложения, который я не могу понять?

...