Я пишу утилиту захвата / анализа пакетов, используя python. Это работает хорошо, но у меня довольно серьезная утечка памяти. Когда я использую профилировщики производительности, в строке cap.next () всегда указывается, где накапливается память. Я перепробовал каждый модуль живого сетевого потока, который смог найти (pypcap, pcapy, scapy, pyshark), и ни один из них не помог утечке. Однако непосредственное использование сокета не только начинается с существенно меньшей площади, но и растет с такой номинальной скоростью, что это сработает.
Однако проблема с сокетом заключается в том, что я пропускаю много пакетов. Я могу сказать, потому что пакеты udp, которые я собираю, имеют порядковый номер в первых нескольких байтах полезной нагрузки, которые я распаковываю с помощью struct. У меня есть tcpdump, работающий рядом, и tcpdump не пропускает ни одного пакета.
Я бы предположил, что причина, по которой я пропускаю так много пакетов, в том, что приемный буфер мал с сокетом. Я увеличил его до максимума, который позволит моя ОС, что в результате экспериментов составляет 425984 байта. Кажется, это не имело значения. Мой план состоит в том, чтобы как можно быстрее снять сообщения с провода и поместить их в очередь, где у меня будет больше передышки для обработки пакета. Пока что мне это не удалось. Сначала я реализовал очередь, затем переключился на heapq, но улучшения не увидел.
Вот часть получения моего кода с использованием сокета и heapqueue. От отладки я почти уверен, что отсутствующие пакеты никогда не попадают в кучу.
#list for heap queue
uq []
#last sequence number for gap check
lastSeq = 0
#create raw socket
s = socket.socket( socket.AF_PACKET , socket.SOCK_RAW , socket.ntohs(0x0003))
#check default buffer size
bufsize = s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
print ("Buffer size [Before]:%d" %bufsize)
#increase the buffer
s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,450560)
#check that buffer has been increased
bufsize = s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
print ("Buffer size [After]:%d" %bufsize)
#narrow scope to specific interface in effort to cut down on throughput
iface_name = "em1'
s.bind((iface_name,0))
def parse():
eth_length = 14
#wait 2 seconds for queue to fill
time.sleep(2)
while True:
netq = heappop(uq)
packet = netq[0]
ancdata = netq[1]
eth_length = 14
eth_header = packet[:eth_length]
eth = struct.unpack('!6s6sH' , eth_header)
eth_protocol = socket.ntohs(eth[2])
if eth_protocol == 8:
ip_header = packet[eth_length:20+eth_length]
iph = struct.unpack('!BBHHHBBH4s4s' , ip_header)
protocol = iph[6]
if protocol == 17:
try:
MsgSeqNum = struct.unpack_from("<i", udp.body, offset=0)[0]
if (src in lastSeq) and (seqnum != lastSeq[src] + 1):
print('[udpparser] gap detected within parser! last/now',lastSeq[src],seqnum)
lastSeq[src] = seqnum
else:
# print('[udpparser] for good measure, no gap last/now',lastSeq[src],seqnum)
lastSeq[src] = seqnum
netthread = threading.Thread(target=parse, )
netthread.daemon = True
netthread.start()
while True:
packet, ancdata, flags, address = s.recvmsg(65545, 1024)
heappush(uq,(packet,ancdata,))