Вы можете запустить распределенную систему и передавать данные через центральную систему очередей.Возьмите философию Unix и создайте одно приложение, которое выполняет несколько задач и делает их хорошо.Создайте одно приложение, которое прослушивает пакеты (вы можете использовать scapy
здесь, так как это не имеет большого значения, блокируете ли вы что-либо), затем отправляет их в очередь (RabitMQ, Redis, SQS и т. Д.), А другое приложение обрабатывает пакет изочередь.Этот метод должен дать вам наименьшую головную боль.
Если вам нужно запустить все в одном приложении, то потоки / многопроцессорная обработка - единственный вариант.Но есть некоторые шаблоны проектирования, которым вы захотите следовать.Вы также можете разбить следующий код на отдельные функции и использовать выделенную систему очередей.
from threading import Thread
from time import sleep
from twisted.internet import defer, reactor
class Sniffer(Thread):
def __init__(self, _reactor, shared_queue):
super().__init__()
self.reactor = _reactor
self.shared_queue = shared_queue
def run(self):
"""
Sniffer logic here
"""
while True:
self.reactor.callFromThread(self.shared_queue.put, 'hello world')
sleep(5)
@defer.inlineCallbacks
def consume_from_queue(_id, _reactor, shared_queue):
item = yield shared_queue.get()
print(str(_id), item)
_reactor.callLater(0, consume_from_queue, _id, _reactor, shared_queue)
def main():
shared_queue = defer.DeferredQueue()
sniffer = Sniffer(reactor, shared_queue)
sniffer.daemon = True
sniffer.start()
workers = 4
for i in range(workers):
consume_from_queue(i+1, reactor, shared_queue)
reactor.run()
main()
Класс Sniffer
запускается вне контроля Twisted.Обратите внимание на sniffer.daemon = True
, это так, что поток остановится, когда основной поток остановился.Если он установлен на False
(по умолчанию), то приложение будет закрываться, только если все потоки закончились.В зависимости от поставленной задачи это может или не всегда возможно.Если вы можете сделать перерыв в прослушивании, чтобы проверить событие потока, то вы можете остановить поток более безопасным способом.
self.reactor.callFromThread(self.shared_queue.put, 'hello world')
необходимо, чтобы элемент, помещаемый в очередь, происходил восновной поток реактора, в отличие от потока, выполняемого Sniffer
.Основным преимуществом этого будет то, что будет некоторая синхронизация сообщений, поступающих из потоков (при условии, что вы планируете масштабировать до прослушивания нескольких интерфейсов).Кроме того, я не был уверен, что DeferredQueue
объекты являются потокобезопасными :) Я относился к ним так, как будто они не были.
Поскольку Twisted не управляет потоками в этом случае, жизненно важно, чтобы разработчик делал это.Обратите внимание на петлю worker
и consume_from_queue(i+1, reactor, shared_queue)
.Этот цикл гарантирует, что только желаемое количество работников выполняет задачи.Внутри функции consume_from_queue()
shared_queue.get()
будет ожидать (не блокируя), пока элемент не будет помещен в очередь, напечатает элемент, а затем запланирует еще один consume_from_queue()
.