Как я могу иметь 2 процесса прослушивания 20 портов, каждый из которых имеет собственный asyncore.loop? - PullRequest
0 голосов
/ 11 июля 2019

Мне удалось иметь в основном потоке 30 портов без потери пакетов, но если я хочу прослушать 40 портов, я начинаю терять пакеты. Я могу видеть это с top, а также watch /proc/net/udp. С 30 портами на 100% при использовании процессора и без потери пакетов со скоростью 1124,69 Мб / сек. Но как только я пытаюсь прослушать 31 порт, я начинаю видеть, что ядро ​​сбрасывает пакеты. Когда я пытаюсь иметь 2 процесса, он хорошо работает в первой части, но я не могу закончить прослушивание 2 asyncore.loops и прочитать данные.

На компьютере, где я тестирую это, я не могу установить Python3 или любую другую библиотеку, например Twisted. Поэтому я должен сделать это с Python 2.7.

Пока я просто пытаюсь рассчитать скорость для каждого порта и общую скорость.

Я думаю, что моя проблема в том, что после запуска цикла для каждого процесса и способа, которым я выполняю объект процесса, я уверен, что что-то упустил.

import os
import logging
import asyncore
import socket
import multiprocessing
import time
import threading

BUFF_SIZE = 4096

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

class PortReader(asyncore.dispatcher):
    def __init__(self, host, port):
        asyncore.dispatcher.__init__(self)
        self.host = host
        self.port = port
        self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.bind((host, port))
        self.cnt = 0
        self.t0 = 0
        self.tf = 0

    def readable(self):
        return True

    def writable(self):
        return False

    def handle_connect(self):
        logging.debug("%d -- Connected to: %s, %d" % (os.getpid(), self.host, self.port))
        self.t0 = time.time()

    def handle_read(self):
        data = self.recv(BUFF_SIZE)
        if data:
            self.cnt+=len(data)

    def handle_close(self):
        logging.debug("%d -- Desconnected from: %s, %d with %d" % (os.getpid(),self.host, self.port, self.cnt))
        self.close()
        self.tf = time.time() - self.t0

class Process(multiprocessing.Process):
    def __init__(self, cpu_number):
        super(Process, self).__init__()
        self.map = {}
    self.data_ports = range(8080 + (40/CPUS_TO_READ) * cpu_number, 8080 + (40/CPUS_TO_READ) * (cpu_number + 1))
    self.listeners =[]
    self.rate ={}

    def run(self):
        for port in self.data_ports:
            self.listeners.append(PortReader('192.168.0.1', port))

        self.map = asyncore.socket_map

        asyncore.loop(timeout=0.001, use_poll=False, map=self.map)

        #self.map = asyncore.socket_map
        #logging.debug(self.map)
        #self.thread= threading.Thread(target=asyncore.loop, kwargs= {'timeout':0.1, 'map':self.map}) # thread here also did not work
        #self.thread.start()

    def stop(self):
        logging.debug(asyncore.socket_map) # empty
        logging.debug(self.listeners) # empty...
        for listener in self.listeners:
            logging.debug(listener)
            listener.handle_close()
        #self.thread.join()

    def calculate_results(self):
        for listener in self.listeners:
            port = listener.port # I can read this Ok
            tf = listener.tf # I can read this Ok
            tbytes = listener.cnt # This one is empty
            self.rate[port] = tbytes/tf

 if __name__=='__main__':

    logging.debug(os.getpid())

    CPUS_TO_READ = 2

    adqtime = 60

    processes = [Process(cpu) for cpu in range(CPUS_TO_READ)]
    [p.start() for p in processes]

    time.sleep(adqtime)

    #print maps

    data = {}
    for process in processes:
        #logging.debug(process.map)
        process.stop() 
        process.calculate_results()
        data.update(process.rate)
        process.join()
    print data # empty
    for port in range(8080, 8080+40):
        print "{0:.2f} B/seg on port {1}".format(data[port], port)
...