Я новичок в python и впервые использую его для обработки файлов pcap. До сих пор я пришел с программой, которая отфильтровывает пакеты, которые принадлежат определенному IP и PROTOCOL, и записывает их в новый файл pcap.
from scapy.all import *
import re
import glob
def process_pcap(path, hosts, ports):
pktdump = PcapWriter("temp11.pcap", append=True, sync=True)
count=0;
for pcap in glob.glob(os.path.join(path, '*.pcapng')):
print "Reading file", pcap
packets=rdpcap(pcap)
for pkt in packets:
if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
count=count+1
print "Writing packets " , count
#wrpcap("temp.pcap", pkt)
pktdump.write(pkt)
path="\workspace\pcaps"
file_ip = open('ip_list.txt', 'r') #Text file with many ip address
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
ports=[443] # Protocols to be added in filter
process_pcap(path, hosts, ports)
Этот код занимал слишком много времени, так как список IP, которыйон должен соответствовать 1000 IP-адресов, а файлы pcap в каталоге также могут содержать гигабайты. Вот почему необходимо ввести многопоточность. Для этого я изменил код, как показано ниже:
from scapy.all import *
import re
import glob
import threading
def process_packet(pkt, pktdump, packets, ports):
count = 0
if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
count=count+1
print "Writing packets " , count
#wrpcap("temp.pcap", pkt)
pktdump.write(pkt)
def process_pcap(path, hosts, ports):
pktdump = PcapWriter("temp11.pcap", append=True, sync=True)
ts=list()
for pcap in glob.glob(os.path.join(path, '*.pcapng')):
print "Reading file", pcap
packets=rdpcap(pcap)
for pkt in packets:
t=threading.Thread(target=process_packet,args=(pkt,pktdump, packets,ports,))
ts.append(t)
t.start()
for t in ts:
t.join()
path="\workspace\pcaps"
file_ip = open('ip_list.txt', 'r') #Text file with many ip address
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
ports=[443] # Protocos to be added in filter
process_pcap(path, hosts, ports)
Но я думаю, что я делаю это не лучшим образом, так как время совсем не сократилось.
Любые предложения, пожалуйста!
РЕДАКТИРОВАТЬ:
Я изменил код в соответствии с ответом, мой плохо, как он работает, но потоки не завершают себя. Все примеры многопоточности в python не требуют явного завершения потока. Пожалуйста, укажите проблему в этом коде;
from scapy.all import *
import re
import glob
import threading
import Queue
import multiprocessing
#global variables declaration
path="\pcaps"
pcapCounter = len(glob.glob1(path,"*.pcapng")) #size of the queue
q = Queue.Queue(pcapCounter) # queue to hold all pcaps in directory
pcap_lock = threading.Lock()
ports=[443] # Protocols to be added in filter
def safe_print(content):
print "{0}\n".format(content),
def process_pcap (hosts):
content = "Thread no ", threading.current_thread().name, " in action"
safe_print(content)
if not q.empty():
with pcap_lock:
content = "IN LOCK ", threading.current_thread().name
safe_print(content)
pcap=q.get()
content = "OUT LOCK", threading.current_thread().name, " and reading packets from ", pcap
safe_print(content)
packets=rdpcap(pcap)
pktdump = PcapWriter(threading.current_thread().name+".pcapng", append=True, sync=True)
pList=[]
for pkt in packets:
if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
pList.append(pkt)
content="Wrting Packets to pcap ", threading.current_thread().name
safe_print(content)
pktdump.write(pList)
else:
content = "DONE!! QUEUE IS EMPTY", threading.current_thread().name
safe_print(content)
for pcap in glob.glob(os.path.join(path, '*.pcapng')):
q.put(pcap)
file_ip = open('ip_list.txt', 'r') #Text file with many ip addresses
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
threads = []
cpu = multiprocessing.cpu_count()
for i in range(cpu):
t = threading.Thread(target=process_pcap, args=(hosts,), name = i)
t.start()
threads.append(t)
for t in threads:
t.join()
print "Exiting Main Thread"
Вот ответ на вышеуказанную программу;он никогда не печатает «Выход из основного потока»
('Thread no ', 'Thread-1', ' in action')
('Thread no ', '3', ' in action')
('Thread no ', '1', ' in action')
('Thread no ', '2', ' in action')
('IN LOCK ', 'Thread-1')
('IN LOCK ', '3')
('OUT LOCK', 'Thread-1', ' and reading packets from ', 'path to\\test.pcapng')
('OUT LOCK', '3', ' and reading packets from ', 'path to\\test11.pcapng')
('IN LOCK ', '1')
('Wrting Packets to pcap ', '3')
('Wrting Packets to pcap ', 'Thread-1')
РЕДАКТИРОВАТЬ 2: Я заблокировал очередь до проверки длины, и все работает нормально.
Спасибо.