Мне удалось иметь в основном потоке 30 портов без потери пакетов, но если я хочу прослушать 40 портов, я начинаю терять пакеты. Я могу видеть это с top
, а также watch /proc/net/udp
.
С 30 портами на 100% при использовании процессора и без потери пакетов со скоростью 1124,69 Мб / сек. Но как только я пытаюсь прослушать 31 порт, я начинаю видеть, что ядро сбрасывает пакеты.
Когда я пытаюсь иметь 2 процесса, он хорошо работает в первой части, но я не могу закончить прослушивание 2 asyncore.loops и прочитать данные.
На компьютере, где я тестирую это, я не могу установить Python3 или любую другую библиотеку, например Twisted. Поэтому я должен сделать это с Python 2.7.
Пока я просто пытаюсь рассчитать скорость для каждого порта и общую скорость.
Я думаю, что моя проблема в том, что после запуска цикла для каждого процесса и способа, которым я выполняю объект процесса, я уверен, что что-то упустил.
import os
import logging
import asyncore
import socket
import multiprocessing
import time
import threading
BUFF_SIZE = 4096
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
class PortReader(asyncore.dispatcher):
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.host = host
self.port = port
self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
self.bind((host, port))
self.cnt = 0
self.t0 = 0
self.tf = 0
def readable(self):
return True
def writable(self):
return False
def handle_connect(self):
logging.debug("%d -- Connected to: %s, %d" % (os.getpid(), self.host, self.port))
self.t0 = time.time()
def handle_read(self):
data = self.recv(BUFF_SIZE)
if data:
self.cnt+=len(data)
def handle_close(self):
logging.debug("%d -- Desconnected from: %s, %d with %d" % (os.getpid(),self.host, self.port, self.cnt))
self.close()
self.tf = time.time() - self.t0
class Process(multiprocessing.Process):
def __init__(self, cpu_number):
super(Process, self).__init__()
self.map = {}
self.data_ports = range(8080 + (40/CPUS_TO_READ) * cpu_number, 8080 + (40/CPUS_TO_READ) * (cpu_number + 1))
self.listeners =[]
self.rate ={}
def run(self):
for port in self.data_ports:
self.listeners.append(PortReader('192.168.0.1', port))
self.map = asyncore.socket_map
asyncore.loop(timeout=0.001, use_poll=False, map=self.map)
#self.map = asyncore.socket_map
#logging.debug(self.map)
#self.thread= threading.Thread(target=asyncore.loop, kwargs= {'timeout':0.1, 'map':self.map}) # thread here also did not work
#self.thread.start()
def stop(self):
logging.debug(asyncore.socket_map) # empty
logging.debug(self.listeners) # empty...
for listener in self.listeners:
logging.debug(listener)
listener.handle_close()
#self.thread.join()
def calculate_results(self):
for listener in self.listeners:
port = listener.port # I can read this Ok
tf = listener.tf # I can read this Ok
tbytes = listener.cnt # This one is empty
self.rate[port] = tbytes/tf
if __name__=='__main__':
logging.debug(os.getpid())
CPUS_TO_READ = 2
adqtime = 60
processes = [Process(cpu) for cpu in range(CPUS_TO_READ)]
[p.start() for p in processes]
time.sleep(adqtime)
#print maps
data = {}
for process in processes:
#logging.debug(process.map)
process.stop()
process.calculate_results()
data.update(process.rate)
process.join()
print data # empty
for port in range(8080, 8080+40):
print "{0:.2f} B/seg on port {1}".format(data[port], port)