Как эффективно буферизовать пакеты udp из необработанного сокета в python? - PullRequest
0 голосов
/ 30 апреля 2019

Я пишу утилиту захвата / анализа пакетов, используя python. Это работает хорошо, но у меня довольно серьезная утечка памяти. Когда я использую профилировщики производительности, в строке cap.next () всегда указывается, где накапливается память. Я перепробовал каждый модуль живого сетевого потока, который смог найти (pypcap, pcapy, scapy, pyshark), и ни один из них не помог утечке. Однако непосредственное использование сокета не только начинается с существенно меньшей площади, но и растет с такой номинальной скоростью, что это сработает.

Однако проблема с сокетом заключается в том, что я пропускаю много пакетов. Я могу сказать, потому что пакеты udp, которые я собираю, имеют порядковый номер в первых нескольких байтах полезной нагрузки, которые я распаковываю с помощью struct. У меня есть tcpdump, работающий рядом, и tcpdump не пропускает ни одного пакета.

Я бы предположил, что причина, по которой я пропускаю так много пакетов, в том, что приемный буфер мал с сокетом. Я увеличил его до максимума, который позволит моя ОС, что в результате экспериментов составляет 425984 байта. Кажется, это не имело значения. Мой план состоит в том, чтобы как можно быстрее снять сообщения с провода и поместить их в очередь, где у меня будет больше передышки для обработки пакета. Пока что мне это не удалось. Сначала я реализовал очередь, затем переключился на heapq, но улучшения не увидел.

Вот часть получения моего кода с использованием сокета и heapqueue. От отладки я почти уверен, что отсутствующие пакеты никогда не попадают в кучу.

#list for heap queue
uq []
#last sequence number for gap check
lastSeq = 0

#create raw socket 
s = socket.socket( socket.AF_PACKET , socket.SOCK_RAW , socket.ntohs(0x0003))
#check default buffer size
bufsize = s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
print ("Buffer size [Before]:%d" %bufsize)
#increase the buffer
s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,450560)
#check that buffer has been increased 
bufsize = s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
print ("Buffer size [After]:%d" %bufsize)
#narrow scope to specific interface in effort to cut down on throughput
iface_name = "em1'
s.bind((iface_name,0))


def parse():
    eth_length = 14
#wait 2 seconds for queue to fill
    time.sleep(2)
    while True:
       netq = heappop(uq)
       packet = netq[0]
       ancdata = netq[1]

       eth_length = 14
       eth_header = packet[:eth_length]
       eth = struct.unpack('!6s6sH' , eth_header)
       eth_protocol = socket.ntohs(eth[2])
       if eth_protocol == 8:
           ip_header = packet[eth_length:20+eth_length]
           iph = struct.unpack('!BBHHHBBH4s4s' , ip_header)
           protocol = iph[6]
           if protocol == 17:
               try:
                    MsgSeqNum = struct.unpack_from("<i", udp.body, offset=0)[0]
                    if (src in lastSeq) and (seqnum != lastSeq[src] + 1):
                        print('[udpparser] gap detected within parser! last/now',lastSeq[src],seqnum)
                        lastSeq[src] = seqnum
                    else:
#                        print('[udpparser] for good measure, no gap last/now',lastSeq[src],seqnum)
                         lastSeq[src] = seqnum       

netthread = threading.Thread(target=parse, )
netthread.daemon = True
netthread.start()


while True:
    packet, ancdata, flags, address = s.recvmsg(65545, 1024)

    heappush(uq,(packet,ancdata,))

...