Я работаю над дизайном приложения, в котором я собираю входящие данные из сокета (работающего в отдельном потоке) и сохраняю их в очереди многопроцессорной обработки. Затем в основном потоке я читаю объекты из очереди и apply_function для каждого из них, используя multiprocessing apply_asyn c.
import time
import os
import stat
import json
import struct
import socketserver
from datetime import date
import threading
from multiprocessing import cpu_count, Pool, Queue
from decouple import config
# Global Variable declarations
q = Queue() # Multiprocessed Queue
class RecvFileDataStreamHandler(socketserver.StreamRequestHandler):
'''Handler for a recieving request and adding data into queues'''
def handle(self):
'''
Handle multiple requests - each 4-byte length,
followed by the converting data to dict and add in queues
'''
# Code to Handle incoming requests
# Unpickle received data and store it in multiproessing
q.put(data_dict)
class FileDataReceiver(socketserver.ThreadingTCPServer):
daemon_threads = True
def __init__(self, host=HOST, port=WATCHER_PORT, handler=RecvFileDataStreamHandler):
print(f"Started listening for file_data on {(host, port)}")
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
def process_file(_file):
# Main worker process
def callback_func(result):
# Do something with data
def error_func(result):
print("ERROR CALLBACK")
if __name__=="__main__":
recv = FileDataReceiver()
threading.Thread(target=recv.serve_forever, daemon=True).start()
# Infinite Main Thread to process files in file_q
while True:
if q.qsize():
result = Pool().apply_async(process_file, args=(file_q.get(False),), callback=callback_func, error_callback=error_func)
Проблема здесь в том, что данные будут непрерывными, поэтому мне нужно отслеживать бесконечно, поэтому я не может закрыть Pool. Без закрытия пула он порождает несколько дочерних процессов, которые превышают допустимый предел на машине Linux. Есть ли какое-то обходное решение или лучший дизайн приложения, который я не могу понять?