Введение многопоточности не уменьшает время выполнения программы Python - PullRequest
0 голосов
/ 07 ноября 2019

Я новичок в python и впервые использую его для обработки файлов pcap. До сих пор я пришел с программой, которая отфильтровывает пакеты, которые принадлежат определенному IP и PROTOCOL, и записывает их в новый файл pcap.

from scapy.all import *
import re
import glob

def process_pcap(path, hosts, ports):
    pktdump = PcapWriter("temp11.pcap", append=True, sync=True)
    count=0;
    for pcap in glob.glob(os.path.join(path, '*.pcapng')):
        print "Reading file", pcap
        packets=rdpcap(pcap)
        for pkt in packets:
            if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
                if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
                    count=count+1
                    print "Writing packets " , count
                    #wrpcap("temp.pcap", pkt)
                    pktdump.write(pkt)


path="\workspace\pcaps"
file_ip = open('ip_list.txt', 'r') #Text file with many ip address
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
ports=[443] # Protocols to be added in filter
process_pcap(path, hosts, ports)  

Этот код занимал слишком много времени, так как список IP, которыйон должен соответствовать 1000 IP-адресов, а файлы pcap в каталоге также могут содержать гигабайты. Вот почему необходимо ввести многопоточность. Для этого я изменил код, как показано ниже:

from scapy.all import *
import re
import glob
import threading


def process_packet(pkt, pktdump, packets, ports):
count = 0
if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
            if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
                count=count+1
                print "Writing packets " , count
                #wrpcap("temp.pcap", pkt)
                pktdump.write(pkt)  


def process_pcap(path, hosts, ports):
pktdump = PcapWriter("temp11.pcap", append=True, sync=True)
ts=list()
for pcap in glob.glob(os.path.join(path, '*.pcapng')):
    print "Reading file", pcap
    packets=rdpcap(pcap)
    for pkt in packets:
         t=threading.Thread(target=process_packet,args=(pkt,pktdump, packets,ports,))
         ts.append(t)
         t.start()
for t in ts:
    t.join()    


path="\workspace\pcaps"
file_ip = open('ip_list.txt', 'r') #Text file with many ip address
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
ports=[443] # Protocos to be added in filter
process_pcap(path, hosts, ports)  

Но я думаю, что я делаю это не лучшим образом, так как время совсем не сократилось.

Любые предложения, пожалуйста!

РЕДАКТИРОВАТЬ:

Я изменил код в соответствии с ответом, мой плохо, как он работает, но потоки не завершают себя. Все примеры многопоточности в python не требуют явного завершения потока. Пожалуйста, укажите проблему в этом коде;

from scapy.all import *
import re
import glob
import threading
import Queue
import multiprocessing

#global variables declaration

path="\pcaps"
pcapCounter = len(glob.glob1(path,"*.pcapng")) #size of the queue
q = Queue.Queue(pcapCounter) # queue to hold all pcaps in directory
pcap_lock = threading.Lock()
ports=[443] # Protocols to be added in filter


def safe_print(content):
    print "{0}\n".format(content),

def process_pcap (hosts):
    content = "Thread no ", threading.current_thread().name, " in action"
    safe_print(content)
    if not q.empty():
        with pcap_lock:
            content = "IN LOCK ", threading.current_thread().name
            safe_print(content)
            pcap=q.get()

        content = "OUT LOCK", threading.current_thread().name, " and reading packets from ", pcap
        safe_print(content)   
        packets=rdpcap(pcap)


        pktdump = PcapWriter(threading.current_thread().name+".pcapng", append=True, sync=True)
        pList=[]
        for pkt in packets:
            if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
                if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
                    pList.append(pkt)

                    content="Wrting Packets to pcap ", threading.current_thread().name
                    safe_print(content)
                    pktdump.write(pList) 


else:
    content = "DONE!! QUEUE IS EMPTY", threading.current_thread().name
    safe_print(content)


for pcap in glob.glob(os.path.join(path, '*.pcapng')):
    q.put(pcap)

file_ip = open('ip_list.txt', 'r') #Text file with many ip addresses
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
threads = []
cpu = multiprocessing.cpu_count() 
for i in range(cpu):
    t = threading.Thread(target=process_pcap, args=(hosts,), name = i)
    t.start()
    threads.append(t)

for t in threads:
    t.join()


print "Exiting Main Thread"

Вот ответ на вышеуказанную программу;он никогда не печатает «Выход из основного потока»

('Thread no ', 'Thread-1', ' in action')
('Thread no ', '3', ' in action')
('Thread no ', '1', ' in action')
('Thread no ', '2', ' in action')
('IN LOCK ', 'Thread-1')
('IN LOCK ', '3')
('OUT LOCK', 'Thread-1', ' and reading packets from ', 'path to\\test.pcapng')
('OUT LOCK', '3', ' and reading packets from ', 'path to\\test11.pcapng')
('IN LOCK ', '1')
('Wrting Packets to pcap ', '3')
('Wrting Packets to pcap ', 'Thread-1')

РЕДАКТИРОВАТЬ 2: Я заблокировал очередь до проверки длины, и все работает нормально.

Спасибо.

Ответы [ 2 ]

3 голосов
/ 07 ноября 2019

Вы создаете поток для пакета. Это фундаментальная проблема.

Кроме того, вы выполняете шаг ввода-вывода для каждого обработанного пакета, а не записываете пакет пакетов

Вероятно, на вашем ПК установлено от 1 до 10 ядер. Для подсчета пакетов, которые вы обрабатываете, затраты на создание 1000+ потоков превышают значение параллелизма каждого из ваших ядер. Существует очень быстрый закон убывающих возвратов, чтобы иметь больше работающих потоков, чем доступных ядер.

Вот лучший подход, при котором вы сможете реализовать преимущества параллелизма.

Основной поток создает глобальную очередь иблокировка для совместного использования последующими потоками. Перед созданием каких-либо потоков основной поток перечисляет список файлов *.pcapng и помещает каждое имя файла в очередь. Он также читает список IP-адресов, который также используется для фильтрации пакетов.

Затем создаются N потоков. Где N - количество ядер на вашем устройстве (N = os.cpu_count()).

Каждый поток вводит блокировку, чтобы вытолкнуть следующий файл из очереди, установленной основным потоком, а затем снять блокировку. Затем поток считывает файл в список packets и удаляет те, которые ему не нужны. Затем сохраните обратно в отдельный уникальный файл, который представляет отфильтрованные результаты для исходного входного файла. В идеале объект pktdump поддерживает запись более 1 пакета за раз, так как пакетные операции ввода-вывода экономят много времени.

После того, как поток обработает один файл, он снова входит в блокировку, извлекает следующий файл из очереди, снимает блокировку и повторяет обработку для следующего файла.

поток завершается, когда очередь имен файлов становится пустой.

Основной поток ожидает завершения всех N потоков. Теперь у вас есть целый набор файлов K, которые вы хотите объединить. Вашему главному потоку нужно только повторно открыть эти K-файлы, созданные потоками, и объединить каждый обратно в один выходной файл.

2 голосов
/ 07 ноября 2019

Вот как Python работает с потоками, читайте о GIL . Если вы хотите сделать это параллельно, вы должны использовать multiprocessing

...